天天看点

kafka学习_Kafka 学习笔记04

kafka学习_Kafka 学习笔记04

以下内容来自拉勾课程学习拉勾教育 - 拉勾旗下教育平台

以下内容来自拉勾课程学习拉勾教育 - 拉勾旗下教育平台

以下内容来自拉勾课程学习拉勾教育 - 拉勾旗下教育平台

高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)

一、集群搭建要求

1.搭建设计

kafka学习_Kafka 学习笔记04

2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeeper

kafka学习_Kafka 学习笔记04

二、准备工作

准备六台机器

kafka学习_Kafka 学习笔记04

1.分配三台Linux,用于安装拥有三个节点的Kafka集群

kafkaCluster20 (192.168.80.20)

kafkaCluster21 (192.168.80.21)

kafkaCluster22 (192.168.80.22)

kafka学习_Kafka 学习笔记04

2.分配三台Linux,用于安装拥有三个节点的zookeeper集群

zookeeperCluster10 (192.168.80.10)

zookeeperCluster11 (192.168.80.11)

zookeeperCluster12 (192.168.80.12)

kafka学习_Kafka 学习笔记04

3.配置六台主机的/etc/hostname文件,配置主机名称

kafka学习_Kafka 学习笔记04

4.配置六台主机的/etc/hosts配置,配置主机和ip地址的映射关系

192.168.80.10 zookeeperCluster10

192.168.80.11 zookeeperCluster11

192.168.80.12 zookeeperCluster12

192.168.80.20 kafkaCluster20

192.168.80.21 kafkaCluster21

192.168.80.22 kafkaCluster22

kafka学习_Kafka 学习笔记04

三、zookeeper集群搭建(先操作zookeeperCluster11节点)

3.1 Linux安装JDK,三台Linux都安装(zookeeper需要依赖jdk环境)

1. 上传JDK到 zookeeper节点中任一台(先操作zookeeperCluster11节点)

可以在XShell安装lrzsz实现文件上传到Linux服务器:

XShell安装lrzsz实现文件上传到Linux服务器_chuanchengdabing的博客-CSDN博客​blog.csdn.net

kafka学习_Kafka 学习笔记04
kafka学习_Kafka 学习笔记04

上传成功

kafka学习_Kafka 学习笔记04

2. 安装JDK

#使用rpm安装JDK
	rpm -ivh jdk-8u261-linux-x64.rpm
           
kafka学习_Kafka 学习笔记04

jdk默认的安装路径

/usr/java/jdk1.8.0_261-amd64
           
kafka学习_Kafka 学习笔记04

3. 配置JAVA_HOME / 配置JDK环境变量

#编辑配置文件
	vim /etc/profile
	
	# 文件最后添加两行
	export JAVA_HOME=/usr/java/jdk1.8.0_261-amd64
	export PATH=$PATH:$JAVA_HOME/bin
	# 退出vim
           
kafka学习_Kafka 学习笔记04

4. 刷新配置文件,查看JDK是否正确安装

source /etc/profile
	java -version
           
kafka学习_Kafka 学习笔记04

5. 在其他两台zookeeper节点上拷贝jdk(zookeeperCluster12和zookeeperCluster10)

scp -r /usr/java/jdk1.8.0_261-amd64/ zookeeperCluster12:/usr/java/jdk1.8.0_261-amd64/
	scp -r /usr/java/jdk1.8.0_261-amd64/ zookeeperCluster10:/usr/java/jdk1.8.0_261-amd64/
           
kafka学习_Kafka 学习笔记04

拷贝完成

kafka学习_Kafka 学习笔记04

参考上述节点,配置zookeeperCluster12和zookeeperCluster10节点上jdk的环境变量

kafka学习_Kafka 学习笔记04

3.2 Linux 安装Zookeeper,三台Linux都安装,以搭建Zookeeper集群(先操作zookeeperCluster11节点)

1. 使用工具lrzsz上传zookeeper-3.4.14.tar.gz到Linux

kafka学习_Kafka 学习笔记04
kafka学习_Kafka 学习笔记04

2. 解压zookeeper 到 /opt目录

tar -zxf zookeeper-3.4.14.tar.gz -C /opt
           
kafka学习_Kafka 学习笔记04

3. 拷贝并配置zookeeper配置文件

cd /opt/zookeeper-3.4.14/conf
	cp zoo_sample.cfg zoo.cfg
	vim zoo.cfg
           
kafka学习_Kafka 学习笔记04
# 具体配置参数 
	# 设置
	dataDir=/var/dabing/zookeeper/data
	# 添加
	server.2=zookeeperCluster10:2881:3881
	server.1=zookeeperCluster11:2881:3881
	server.3=zookeeperCluster12:2881:3881
	# 退出vim
           
kafka学习_Kafka 学习笔记04

4. 创建zookeeper数据存储目录和myid文件,唯一标识zookeeper

mkdir -p /var/dabing/zookeeper/data
	echo 1 > /var/dabing/zookeeper/data/myid
           
kafka学习_Kafka 学习笔记04

5. 配置环境变量

vim /etc/profile
	# 添加
	export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
	export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
	export ZOO_LOG_DIR=/var/dabing/zookeeper/log
	# 退出vim,让配置生效
	source /etc/profile
           
kafka学习_Kafka 学习笔记04

6. 将/opt/zookeeper-3.4.14拷贝到zookeeperCluster10,zookeeperCluster12

scp -r /opt/zookeeper-3.4.14/ zookeeperCluster10:/opt
	scp -r /opt/zookeeper-3.4.14/ zookeeperCluster12:/opt
           
kafka学习_Kafka 学习笔记04

7. zookeeperCluster10配置

# 配置环境变量
	vim /etc/profile
	
	# 在配置JDK环境变量基础上,添加内容
	export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
	export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
	export ZOO_LOG_DIR=/var/dabing/zookeeper/log
	
	# 退出vim,让配置生效
	source /etc/profile
           
kafka学习_Kafka 学习笔记04
mkdir -p /var/dabing/zookeeper/data
	echo 2 > /var/dabing/zookeeper/data/myid
           
kafka学习_Kafka 学习笔记04

8. zookeeperCluster12配置

# 配置环境变量
	vim /etc/profile
	
	# 在配置JDK环境变量基础上,添加内容
	export ZOOKEEPER_PREFIX=/opt/zookeeper-3.4.14
	export PATH=$PATH:$ZOOKEEPER_PREFIX/bin
	export ZOO_LOG_DIR=/var/dabing/zookeeper/log
	
	# 退出vim,让配置生效
	source /etc/profile
           
kafka学习_Kafka 学习笔记04
mkdir -p /var/dabing/zookeeper/data
	echo 3 > /var/dabing/zookeeper/data/myid
           
kafka学习_Kafka 学习笔记04

9. 启动zookeeper

# 在三台Linux上启动Zookeeper
	[[email protected]~]# zkServer.sh start
	[[email protected]~]# zkServer.sh start
	[[email protected]~]# zkServer.sh start
           
kafka学习_Kafka 学习笔记04

10. 在三台Linux上查看Zookeeper的状态

[[email protected] opt]# zkServer.sh status
	ZooKeeper JMX enabled by default
	Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
	Mode: follower
	
	[[email protected] zookeeper-3.4.14]# zkServer.sh status
	ZooKeeper JMX enabled by default
	Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
	Mode: leader

	[[email protected] conf]# zkServer.sh status
	ZooKeeper JMX enabled by default
	Using config: /opt/zookeeper-3.4.14/bin/../conf/zoo.cfg
	Mode: follower
           

可以发现zookeeperCluster12 节点是leader节点

kafka学习_Kafka 学习笔记04

四、kafka集群搭建(先操作kafkaCluster21节点)

4.1 在jkafka集群安装jdk

1. 从zookeeper集群中任意一个节点拷贝已经安装好的jdk到kafka集群中

kafka学习_Kafka 学习笔记04
kafka学习_Kafka 学习笔记04

2. 配置jdk环境变量

kafka学习_Kafka 学习笔记04

4.2 安装Kafka

1. 上传并解压Kafka到/opt

kafka学习_Kafka 学习笔记04
kafka学习_Kafka 学习笔记04

2. 解压到/opt

tar -zxf kafka_2.12-1.0.2.tgz -C /opt
           
kafka学习_Kafka 学习笔记04

3. 配置Kafka到环境变量

配置环境变量,三台kafka节点都要配置

vim /etc/profile
	
添加以下内容:
	export KAFKA_HOME=/opt/kafka_2.12-1.0.2
	export PATH=$PATH:$KAFKA_HOME/bin
	
让配置生效
	source /etc/profile
           
kafka学习_Kafka 学习笔记04

4. kafkaCluster21配置(其他使用默认配置)

vim /opt/kafka_2.12-1.0.2/config/server.properties
	broker.id=0
	listeners=PLAINTEXT://:9092     	# 当前节点的所有端口将服务发布出去
	advertised.listeners=PLAINTEXT://kafkaCluster21:9092   #存储到zk中,供生产者和消费者使用
	log.dirs=/var/dabing/kafka/kafka-logs
	zookeeper.connect=zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181/myKafka
############################# Server Basics #############################
	# The id of the broker. This must be set to a unique integer for each broker.
	broker.id=0
	############################# Socket Server Settings #############################
	# The address the socket server listens on. It will get the value returned from
	# java.net.InetAddress.getCanonicalHostName() if not configured.
	#   FORMAT:
	#     listeners = listener_name://host_name:port
	#   EXAMPLE:
	#     listeners = PLAINTEXT://your.host.name:9092
	#listeners=PLAINTEXT://:9092
	listeners=PLAINTEXT://:9092
	#advertised.listeners=PLAINTEXT://your.host.name:9092
	advertised.listeners=PLAINTEXT://kafkaCluster21:9092
	############################# Log Basics #############################
	# A comma seperated list of directories under which to store log files
	log.dirs=/var/dabing/kafka/kafka-logs
	############################# Zookeeper #############################
	# Zookeeper connection string (see zookeeper docs for details).
	# This is a comma separated host:port pairs, each corresponding to a zk
	# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
	# You can also append an optional chroot string to the urls to specify the
	# root directory for all kafka znodes.
	zookeeper.connect=zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181/myKafka
           

5. 将kafkaCluster21上配置好的kafka拷贝到kafkaCluster20和kafkaCluster22上

scp -r /opt/kafka_2.12-1.0.2/ kafkaCluster20:/opt/kafka_2.12-1.0.2
	scp -r /opt/kafka_2.12-1.0.2/ kafkaCluster22:/opt/kafka_2.12-1.0.2
           
kafka学习_Kafka 学习笔记04
kafka学习_Kafka 学习笔记04
kafka学习_Kafka 学习笔记04

6. 在kafkaCluster20和kafkaCluster22上配置kafka环境变量

kafka学习_Kafka 学习笔记04

7. kafkaCluster20修改配置文件

vim /opt/kafka_2.12-1.0.2/config/server.properties
	broker.id=1
	listeners=PLAINTEXT://:9092
	advertised.listeners=PLAINTEXT://kafkaCluster20:9092
	log.dirs=/var/dabing/kafka/kafka-logs
	zookeeper.connect=zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181/myKafka
[[email protected] kafka_2.12-1.0.2]# vi /opt/kafka_2.12-1.0.2/config/server.properties 
	############################# Server Basics #############################
	broker.id=1
	############################# Socket Server Settings #############################
	listeners=PLAINTEXT://:9092
	advertised.listeners=PLAINTEXT://kafkaCluster20:9092
	############################# Log Basics #############################
	log.dirs=/var/dabing/kafka/kafka-logs
	############################# Zookeeper #############################
	zookeeper.connect=zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181/myKafka
           

8. kafkaCluster22配置

vim /opt/kafka_2.12-1.0.2/config/server.properties
	broker.id=2
	listeners=PLAINTEXT://:9092
	advertised.listeners=PLAINTEXT://kafkaCluster22:9092
	log.dirs=/var/dabing/kafka/kafka-logs
	zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka
[[email protected] kafka_2.12-1.0.2]# vi /opt/kafka_2.12-1.0.2/config/server.properties 
	############################# Server Basics #############################
	broker.id=2
	listeners=PLAINTEXT://:9092
	advertised.listeners=PLAINTEXT://kafkaCluster22:9092
	############################# Log Basics #############################
	log.dirs=/var/dabing/kafka/kafka-logs
	############################# Zookeeper #############################
	zookeeper.connect=zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181/myKafka
           

9. 启动Kafka

[[email protected]~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
	[[email protected]~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
	[[email protected]~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
           

启动kafkaCluster21节点上的kafka:

[[email protected] softInstall]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties 
           

启动kafkaCluster22节点上的kafka:

[[email protected] kafka_2.12-1.0.2]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
[2020-11-12 06:59:46,812] INFO KafkaConfig values: 
	advertised.host.name = null
	advertised.listeners = PLAINTEXT://kafkaCluster22:9092
	advertised.port = null
	alter.config.policy.class.name = null
	authorizer.class.name = 
	auto.create.topics.enable = true
	auto.leader.rebalance.enable = true
	background.threads = 10
	broker.id = 2
	broker.id.generation.enable = true
	broker.rack = null
	compression.type = producer
	connections.max.idle.ms = 600000
	controlled.shutdown.enable = true
	controlled.shutdown.max.retries = 3
	controlled.shutdown.retry.backoff.ms = 5000
	controller.socket.timeout.ms = 30000
	create.topic.policy.class.name = null
	default.replication.factor = 1
	delete.records.purgatory.purge.interval.requests = 1
	delete.topic.enable = true
	fetch.purgatory.purge.interval.requests = 1000
	group.initial.rebalance.delay.ms = 0
	group.max.session.timeout.ms = 300000
	group.min.session.timeout.ms = 6000
	host.name = 
	inter.broker.listener.name = null
	inter.broker.protocol.version = 1.0-IV0
	leader.imbalance.check.interval.seconds = 300
	leader.imbalance.per.broker.percentage = 10
	listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
	listeners = PLAINTEXT://:9092
	log.cleaner.backoff.ms = 15000
	log.cleaner.dedupe.buffer.size = 134217728
	log.cleaner.delete.retention.ms = 86400000
	log.cleaner.enable = true
	log.cleaner.io.buffer.load.factor = 0.9
	log.cleaner.io.buffer.size = 524288
	log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
	log.cleaner.min.cleanable.ratio = 0.5
	log.cleaner.min.compaction.lag.ms = 0
	log.cleaner.threads = 1
	log.cleanup.policy = [delete]
	log.dir = /tmp/kafka-logs
	log.dirs = /var/dabing/kafka/kafka-logs
	log.flush.interval.messages = 9223372036854775807
	log.flush.interval.ms = null
	log.flush.offset.checkpoint.interval.ms = 60000
	log.flush.scheduler.interval.ms = 9223372036854775807
	log.flush.start.offset.checkpoint.interval.ms = 60000
	log.index.interval.bytes = 4096
	log.index.size.max.bytes = 10485760
	log.message.format.version = 1.0-IV0
	log.message.timestamp.difference.max.ms = 9223372036854775807
	log.message.timestamp.type = CreateTime
	log.preallocate = false
	log.retention.bytes = -1
	log.retention.check.interval.ms = 300000
	log.retention.hours = 168
	log.retention.minutes = null
	log.retention.ms = null
	log.roll.hours = 168
	log.roll.jitter.hours = 0
	log.roll.jitter.ms = null
	log.roll.ms = null
	log.segment.bytes = 1073741824
	log.segment.delete.delay.ms = 60000
	max.connections.per.ip = 2147483647
	max.connections.per.ip.overrides = 
	message.max.bytes = 1000012
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	min.insync.replicas = 1
	num.io.threads = 8
	num.network.threads = 3
	num.partitions = 1
	num.recovery.threads.per.data.dir = 1
	num.replica.fetchers = 1
	offset.metadata.max.bytes = 4096
	offsets.commit.required.acks = -1
	offsets.commit.timeout.ms = 5000
	offsets.load.buffer.size = 5242880
	offsets.retention.check.interval.ms = 600000
	offsets.retention.minutes = 1440
	offsets.topic.compression.codec = 0
	offsets.topic.num.partitions = 50
	offsets.topic.replication.factor = 1
	offsets.topic.segment.bytes = 104857600
	port = 9092
	principal.builder.class = null
	producer.purgatory.purge.interval.requests = 1000
	queued.max.request.bytes = -1
	queued.max.requests = 500
	quota.consumer.default = 9223372036854775807
	quota.producer.default = 9223372036854775807
	quota.window.num = 11
	quota.window.size.seconds = 1
	replica.fetch.backoff.ms = 1000
	replica.fetch.max.bytes = 1048576
	replica.fetch.min.bytes = 1
	replica.fetch.response.max.bytes = 10485760
	replica.fetch.wait.max.ms = 500
	replica.high.watermark.checkpoint.interval.ms = 5000
	replica.lag.time.max.ms = 10000
	replica.socket.receive.buffer.bytes = 65536
	replica.socket.timeout.ms = 30000
	replication.quota.window.num = 11
	replication.quota.window.size.seconds = 1
	request.timeout.ms = 30000
	reserved.broker.max.id = 1000
	sasl.enabled.mechanisms = [GSSAPI]
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.principal.to.local.rules = [DEFAULT]
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.mechanism.inter.broker.protocol = GSSAPI
	security.inter.broker.protocol = PLAINTEXT
	socket.receive.buffer.bytes = 102400
	socket.request.max.bytes = 104857600
	socket.send.buffer.bytes = 102400
	ssl.cipher.suites = null
	ssl.client.auth = none
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
	transaction.max.timeout.ms = 900000
	transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
	transaction.state.log.load.buffer.size = 5242880
	transaction.state.log.min.isr = 1
	transaction.state.log.num.partitions = 50
	transaction.state.log.replication.factor = 1
	transaction.state.log.segment.bytes = 104857600
	transactional.id.expiration.ms = 604800000
	unclean.leader.election.enable = false
	zookeeper.connect = zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181/myKafka
	zookeeper.connection.timeout.ms = 6000
	zookeeper.session.timeout.ms = 6000
	zookeeper.set.acl = false
	zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2020-11-12 06:59:47,192] INFO starting (kafka.server.KafkaServer)
[2020-11-12 06:59:47,195] INFO Connecting to zookeeper on zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181/myKafka (kafka.server.KafkaServer)
[2020-11-12 06:59:47,247] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,247] INFO Client environment:host.name=kafkaCluster22 (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,247] INFO Client environment:java.version=1.8.0_261 (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,247] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,248] INFO Client environment:java.home=/usr/java/jdk1.8.0_261-amd64/jre (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,248] INFO Client environment:java.class.path=:/opt/kafka_2.12-1.0.2/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/opt/kafka_2.12-1.0.2/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka_2.12-1.0.2/bin/../libs/commons-lang3-3.5.jar:/opt/kafka_2.12-1.0.2/bin/../libs/connect-api-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/connect-file-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/connect-json-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/connect-runtime-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/connect-transforms-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/guava-20.0.jar:/opt/kafka_2.12-1.0.2/bin/../libs/hk2-api-2.5.0-b32.jar:/opt/kafka_2.12-1.0.2/bin/../libs/hk2-locator-2.5.0-b32.jar:/opt/kafka_2.12-1.0.2/bin/../libs/hk2-utils-2.5.0-b32.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jackson-annotations-2.9.6.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jackson-core-2.9.6.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jackson-databind-2.9.6.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jackson-jaxrs-base-2.9.6.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jackson-jaxrs-json-provider-2.9.6.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jackson-module-jaxb-annotations-2.9.6.jar:/opt/kafka_2.12-1.0.2/bin/../libs/javassist-3.20.0-GA.jar:/opt/kafka_2.12-1.0.2/bin/../libs/javassist-3.21.0-GA.jar:/opt/kafka_2.12-1.0.2/bin/../libs/javax.annotation-api-1.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/javax.inject-1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/javax.inject-2.5.0-b32.jar:/opt/kafka_2.12-1.0.2/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka_2.12-1.0.2/bin/../libs/javax.ws.rs-api-2.0.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jersey-client-2.25.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jersey-common-2.25.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jersey-container-servlet-2.25.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jersey-guava-2.25.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jersey-media-jaxb-2.25.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jersey-server-2.25.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jetty-continuation-9.2.22.v20170606.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jetty-http-9.2.22.v20170606.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jetty-io-9.2.22.v20170606.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jetty-security-9.2.22.v20170606.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jetty-server-9.2.22.v20170606.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jetty-servlet-9.2.22.v20170606.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jetty-servlets-9.2.22.v20170606.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jetty-util-9.2.22.v20170606.jar:/opt/kafka_2.12-1.0.2/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka_2.12-1.0.2/bin/../libs/kafka_2.12-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/kafka_2.12-1.0.2-sources.jar:/opt/kafka_2.12-1.0.2/bin/../libs/kafka_2.12-1.0.2-test-sources.jar:/opt/kafka_2.12-1.0.2/bin/../libs/kafka-clients-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/kafka-log4j-appender-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/kafka-streams-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/kafka-streams-examples-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/kafka-tools-1.0.2.jar:/opt/kafka_2.12-1.0.2/bin/../libs/log4j-1.2.17.jar:/opt/kafka_2.12-1.0.2/bin/../libs/lz4-java-1.4.jar:/opt/kafka_2.12-1.0.2/bin/../libs/maven-artifact-3.5.0.jar:/opt/kafka_2.12-1.0.2/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka_2.12-1.0.2/bin/../libs/osgi-resource-locator-1.0.1.jar:/opt/kafka_2.12-1.0.2/bin/../libs/plexus-utils-3.0.24.jar:/opt/kafka_2.12-1.0.2/bin/../libs/reflections-0.9.11.jar:/opt/kafka_2.12-1.0.2/bin/../libs/rocksdbjni-5.7.3.jar:/opt/kafka_2.12-1.0.2/bin/../libs/scala-library-2.12.4.jar:/opt/kafka_2.12-1.0.2/bin/../libs/slf4j-api-1.7.25.jar:/opt/kafka_2.12-1.0.2/bin/../libs/slf4j-log4j12-1.7.25.jar:/opt/kafka_2.12-1.0.2/bin/../libs/snappy-java-1.1.4.jar:/opt/kafka_2.12-1.0.2/bin/../libs/validation-api-1.1.0.Final.jar:/opt/kafka_2.12-1.0.2/bin/../libs/zkclient-0.10.jar:/opt/kafka_2.12-1.0.2/bin/../libs/zookeeper-3.4.10.jar (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,250] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,250] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,251] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,251] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,251] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,251] INFO Client environment:os.version=3.10.0-957.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,251] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,251] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,251] INFO Client environment:user.dir=/opt/kafka_2.12-1.0.2 (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,253] INFO Initiating client connection, connectString=zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181 sessionTimeout=6000 [email protected] (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,303] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-11-12 06:59:47,323] INFO Opening socket connection to server zookeeperCluster11/192.168.80.11:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-12 06:59:47,328] INFO Socket connection established to zookeeperCluster11/192.168.80.11:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2020-11-12 06:59:47,331] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2020-11-12 06:59:47,347] INFO Session establishment complete on server zookeeperCluster11/192.168.80.11:2181, sessionid = 0x100009c182d0001, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2020-11-12 06:59:47,372] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2020-11-12 06:59:47,438] INFO Created zookeeper path /myKafka (kafka.server.KafkaServer)
[2020-11-12 06:59:47,439] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-11-12 06:59:47,444] INFO Session: 0x100009c182d0001 closed (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,446] INFO EventThread shut down for session: 0x100009c182d0001 (org.apache.zookeeper.ClientCnxn)
[2020-11-12 06:59:47,447] INFO Initiating client connection, connectString=zookeeperCluster10:2181,zookeeperCluster11:2181,zookeeperCluster12:2181/myKafka sessionTimeout=6000 [email protected] (org.apache.zookeeper.ZooKeeper)
[2020-11-12 06:59:47,451] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-11-12 06:59:47,462] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2020-11-12 06:59:47,480] INFO Opening socket connection to server zookeeperCluster11/192.168.80.11:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-12 06:59:47,486] INFO Socket connection established to zookeeperCluster11/192.168.80.11:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2020-11-12 06:59:47,507] INFO Session establishment complete on server zookeeperCluster11/192.168.80.11:2181, sessionid = 0x100009c182d0002, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2020-11-12 06:59:47,507] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2020-11-12 06:59:48,029] INFO Cluster ID = ao6W1AN6SIaZD-x0PGk3yg (kafka.server.KafkaServer)
[2020-11-12 06:59:48,039] WARN No meta.properties file under dir /var/dabing/kafka/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-11-12 06:59:48,214] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-12 06:59:48,214] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-12 06:59:48,215] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-12 06:59:48,381] INFO Log directory '/var/dabing/kafka/kafka-logs' not found, creating it. (kafka.log.LogManager)
[2020-11-12 06:59:48,569] INFO Loading logs. (kafka.log.LogManager)
[2020-11-12 06:59:48,617] INFO Logs loading complete in 48 ms. (kafka.log.LogManager)
[2020-11-12 06:59:49,792] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2020-11-12 06:59:49,805] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2020-11-12 06:59:51,027] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2020-11-12 06:59:51,032] INFO [SocketServer brokerId=2] Started 1 acceptor threads (kafka.network.SocketServer)
[2020-11-12 06:59:51,092] INFO [ExpirationReaper-2-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-12 06:59:51,092] INFO [ExpirationReaper-2-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-12 06:59:51,152] INFO [ExpirationReaper-2-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-12 06:59:51,176] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2020-11-12 06:59:51,270] INFO [ExpirationReaper-2-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-12 06:59:51,437] INFO [ExpirationReaper-2-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-12 06:59:51,438] INFO [ExpirationReaper-2-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-12 06:59:52,960] INFO [GroupCoordinator 2]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 06:59:52,985] INFO [GroupCoordinator 2]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2020-11-12 06:59:52,995] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 9 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-11-12 06:59:53,072] INFO [ProducerId Manager 2]: Acquired new producerId block (brokerId:2,blockStartProducerId:2000,blockEndProducerId:2999) by writing to Zk with path version 3 (kafka.coordinator.transaction.ProducerIdManager)
[2020-11-12 06:59:53,168] INFO [TransactionCoordinator id=2] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-12 06:59:53,183] INFO [TransactionCoordinator id=2] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-12 06:59:53,194] INFO [Transaction Marker Channel Manager 2]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2020-11-12 06:59:53,318] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2020-11-12 06:59:53,333] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2020-11-12 06:59:53,347] INFO Registered broker 2 at path /brokers/ids/2 with addresses: EndPoint(kafkaCluster22,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2020-11-12 06:59:53,352] WARN No meta.properties file under dir /var/dabing/kafka/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-11-12 06:59:53,424] INFO [SocketServer brokerId=2] Started processors for 1 acceptors (kafka.network.SocketServer)
[2020-11-12 06:59:53,444] INFO Kafka version : 1.0.2 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-12 06:59:53,444] INFO Kafka commitId : 2a121f7b1d402825 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-12 06:59:53,445] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
           

10. 验证Kafka参数

  1. Cluster Id是一个唯一的不可变的标志符,用于唯一标志一个Kafka集群。
  2. 该Id最多可以有22个字符组成,字符对应于URL-safe Base64。
  3. Kafka 0.10.1版本及之后的版本中,在集群第一次启动的时候,

    Broker从Zookeeper的<Kafka_ROOT>/cluster/id节点获取。如果该Id不存在,就自动生成一个新的。

11. 访问zookeeper集群上的任意节点:zkCli.sh

查看每个Broker的信息

[zk: localhost:2181(CONNECTED) 0] ls /
	[myKafka, zookeeper]
           
kafka学习_Kafka 学习笔记04

查看zookeeper上的myKafka信息

[zk: localhost:2181(CONNECTED) 1] ls /myKafka
	[cluster, controller, controller_epoch, brokers, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
           
kafka学习_Kafka 学习笔记04

查看zookeeper上myKafka中的三个节点信息

[zk: localhost:2181(CONNECTED) 2] get /myKafka/brokers/ids/0
	{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafkaCluster21:9092"],"jmx_port":-1,"host":"kafkaCluster21","timestamp":"1605182346116","port":9092,"version":4}
	cZxid = 0x100000020
	ctime = Thu Nov 12 06:59:06 EST 2020
	mZxid = 0x100000020
	mtime = Thu Nov 12 06:59:06 EST 2020
	pZxid = 0x100000020
	cversion = 0
	dataVersion = 0
	aclVersion = 0
	ephemeralOwner = 0x20000a1ccba0001
	dataLength = 198
	numChildren = 0
           
kafka学习_Kafka 学习笔记04
[zk: localhost:2181(CONNECTED) 3] get /myKafka/brokers/ids/1
	{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafkaCluster20:9092"],"jmx_port":-1,"host":"kafkaCluster20","timestamp":"1605182384675","port":9092,"version":4}
	cZxid = 0x100000027
	ctime = Thu Nov 12 06:59:44 EST 2020
	mZxid = 0x100000027
	mtime = Thu Nov 12 06:59:44 EST 2020
	pZxid = 0x100000027
	cversion = 0
	dataVersion = 0
	aclVersion = 0
	ephemeralOwner = 0x100009c182d0000
	dataLength = 198
	numChildren = 0
           
kafka学习_Kafka 学习笔记04
[zk: localhost:2181(CONNECTED) 4] get /myKafka/brokers/ids/2
	{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafkaCluster22:9092"],"jmx_port":-1,"host":"kafkaCluster22","timestamp":"1605182393295","port":9092,"version":4}
	cZxid = 0x10000002e
	ctime = Thu Nov 12 06:59:53 EST 2020
	mZxid = 0x10000002e
	mtime = Thu Nov 12 06:59:53 EST 2020
	pZxid = 0x10000002e
	cversion = 0
	dataVersion = 0
	aclVersion = 0
	ephemeralOwner = 0x100009c182d0002
	dataLength = 198
	numChildren = 0
           
kafka学习_Kafka 学习笔记04

查看集群控制器所在节点

[zk: localhost:2181(CONNECTED) 5] get /myKafka/controller
	{"version":1,"brokerid":0,"timestamp":"1605182345723"}
	cZxid = 0x100000019
	ctime = Thu Nov 12 06:59:05 EST 2020
	mZxid = 0x100000019
	mtime = Thu Nov 12 06:59:05 EST 2020
	pZxid = 0x100000019
	cversion = 0
	dataVersion = 0
	aclVersion = 0
	ephemeralOwner = 0x20000a1ccba0001
	dataLength = 54
	numChildren = 0
	[zk: localhost:2181(CONNECTED) 6]
           
kafka学习_Kafka 学习笔记04

Kafka通过Zookeeper的分布式锁特性选举集群控制器

此时kafka的集群控制器在节点brokerid=0的节点上

kafka学习_Kafka 学习笔记04

演示epoch版本迭代,防止“脑裂”

每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧epoch 的消息,就会忽略它们,以防止“脑裂”。

kafka学习_Kafka 学习笔记04
kafka学习_Kafka 学习笔记04

继续阅读