版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/qq_26654727/article/details/82752988
文章目錄
- kafka指令
- 1.常用指令
- 1.1 [ Adding and removing topics](http://kafka.apache.org/090/documentation.html#basic_ops_add_topic)
- 1.2 [Modifying topics](http://kafka.apache.org/090/documentation.html#basic_ops_modify_topic)
- 1.3 balancing leadership
- 1.4 checking consumer position
- 1.5 [Expanding your cluster(水準擴充叢集)](http://kafka.apache.org/090/documentation.html#basic_ops_cluster_expansion)
- 1.6 [自定義分區配置設定和遷移](http://kafka.apache.org/090/documentation.html#basic_ops_partitionassignment)
- 1.7 [Increasing replication factor(新增副本數)](http://kafka.apache.org/090/documentation.html#basic_ops_increase_replication_factor)
- 1.常用指令
1.1 Adding and removing topics
add:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
--partitions 20 --replication-factor 3 --config x=y
remove:
1.2 Modifying topics
add partition:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
--partitions 40
add configes:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
To remove a config:
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
delete config (不使用):
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
前提是需要加上配置
auto.leader.rebalance.enable=true
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
Group Topic Pid Offset logSize Lag Owner
my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0
my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0
1.5 Expanding your cluster(水準擴充叢集)
- –generate: 在此模式下,給定主題清單和代理清單,該工具會生成候選重新配置設定,以将指定主題的所有分區移動到新代理。此選項僅提供了一種友善的方法,可在給定主題和目标代理清單的情況下生成分區重新配置設定計劃。
- –execute:在此模式下,該工具将根據使用者提供的重新配置設定計劃啟動分區的重新配置設定。 (使用–reassignment-json-file選項)。這可以是由管理者手工制作的自定義重新配置設定計劃,也可以使用–generate選項提供
- –verify: 在此模式下,該工具将驗證最後一次–execute期間列出的所有分區的重新配置設定狀态。狀态可以是成功完成,失敗或正在進行中
> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
3.該工具生成一個候選配置設定,将所有分區從主題foo1,foo2移動到代理5,6。 但請注意,此時分區移動尚未開始,它隻是告訴您目前的配置設定和建議的新配置設定。 應儲存目前配置設定,以防您想要復原它。 新的配置設定應儲存在json檔案中(例如expand-cluster-reassignment.json),以便使用–execute選項輸入工具,如下所示 -
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully
1.6 自定義分區配置設定和遷移
cat custom-reassignment.json
{
"version": 1,
"partitions": [{
"topic": "foo1",
"partition": 0,
"replicas": [5, 6]
}, {
"topic": "foo2",
"partition": 1,
"replicas": [2, 3]
}]
}
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
1.7 Increasing replication factor(新增副本數)
邏輯和上兩個一緻 通過json檔案 手動修改副本資料位置
提供json檔案:
> cat increase-replication-factor.json
{
"version": 1,
"partitions": [{
"topic": "foo",
"partition": 0,
"replicas": [5, 6, 7]
}]
}
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --zookeeper localhost:2181
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
bin/kafka-console-producer.sh --broker-list 130.51.23.95:9092 --topic my-replicated-topic
bin/kafka-console-consumer.sh --zookeeper 130.51.23.95:2181 --topic test --from-beginnin