天天看点

Kafka Docker安装配置

Docker常用命令

下载docker镜像

docker pull wurstmeister/kafka      
docker pull wurstmeister/zookeeper      

启动zk容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper      
olafwang@OLAFWANG-MB0 ~ % docker run -d -name zookeeper -p 2181:2181 wurstmeister/zookeeper
unknown shorthand flag: 'n' in -name
See 'docker run --help'.
olafwang@OLAFWANG-MB0 ~ % docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
c58c4af034290ae1f0ad0b2c2f4214068dcae2b16fffa44ef8e1f45e3cdf1e3a
olafwang@OLAFWANG-MB0 ~ % docker ps
CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
c58c4af03429        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   4 minutes ago       Up 4 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper
olafwang@OLAFWANG-MB0 ~ %      

ZooInspector 图形化工具

ZooInspector是ZooKeeper的图形化工具,ZooInspector下载地址:

​​https://issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip​​

解压,进入​

​ZooInspector\build​

​目录,通过如下命令执行jar包:

java -jar zookeeper-dev-ZooInspector.jar  & //执行成功后,会弹出java ui client      

搭建单机kafka

docker run -d --name kafka1 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=10.35.143.132:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.35.143.132:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka      
olafwang@OLAFWANG-MB0 ~ % docker run -d --name kafka1 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=10.35.143.132:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.35.143.132:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

0c32f822f0fd40230720a079df57274411b430e9ad7aed8263678cb0da533d8a
olafwang@OLAFWANG-MB0 ~ % docker ps -a      
CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
0c32f822f0fd        wurstmeister/kafka       "start-kafka.sh"         2 minutes ago       Up 2 minutes        0.0.0.0:9092->9092/tcp                               kafka1
c58c4af03429        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   30 minutes ago      Up 30 minutes       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper
olafwang@OLAFWANG-MB0 ~ %      

指定容器命名为kafka1,指定了broker_id为0,端口号9092,zk地址为本机2181。

登录kafka容器

docker exec -it kafka1 bash      
olafwang@OLAFWANG-MB0 ~ % docker exec -it kafka1 bash 
bash-4.4#      

测试

创建topic

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --create --topic test --zookeeper 10.35.143.132:2181 --replication-factor 1 --partitions 1
Created topic test.
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --zookeeper 10.35.143.132:2181 --describe --topic test
Topic: test PartitionCount: 1 ReplicationFactor: 1  Configs: 
  Topic: test Partition: 0  Leader: 0 Replicas: 0 Isr: 0
bash-4.4#      

创建名为test 的topic指定zk地址、副本数为1、分区数为1。

发送消息

bash-4.4# $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 10.35.143.132:9092 --topic test 
>123
>321
>1234567
>q!
>exit
>:q!
>
>^Cbash-4.4#      

使用kafka-console-producer.sh发送消息,指定broker的地址。

消费消息

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 10.35.143.132:9092 --topic test --group testGroupId --from-beginning   
123
321
1234567
q!
exit
:q!      

使用kafka-console-consumer.sh消费消息,指定broker的地址。

搭建集群

kafka只需要执行相同的zk地址,那么kafka就可以自动构建起集群。而使用docker命令可快速在同一台机器启动多个kafka,只需要改变brokerId和端口即可,如下命令,新创建了kafka2和kafka3容器:

docker run -d --name kafka2 -p 9093:9093 -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=10.35.143.132:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.35.143.132:9093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka

docker run -d --name kafka3 -p 9094:9094 -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=10.35.143.132:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.35.143.132:9094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka      
olafwang@OLAFWANG-MB0 ~ % docker start kafka2
kafka2
olafwang@OLAFWANG-MB0 ~ % docker start kafka3
kafka3
olafwang@OLAFWANG-MB0 ~ % docker ps -a
CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
b87a7b2687d6        wurstmeister/kafka       "start-kafka.sh"         31 seconds ago      Up 30 seconds       0.0.0.0:9094->9094/tcp                               kafka3
f8b73c742001        wurstmeister/kafka       "start-kafka.sh"         39 seconds ago      Up 38 seconds       0.0.0.0:9093->9093/tcp                               kafka2
0c32f822f0fd        wurstmeister/kafka       "start-kafka.sh"         2 hours ago         Up 2 hours          0.0.0.0:9092->9092/tcp                               kafka1
c58c4af03429        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   2 hours ago         Up 2 hours          22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   zookeeper      

通过 ​

​docker logs kafka2​

​可以查看容器日志。

测试

创建topic

bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --create --topic test_cluster_topic --zookeeper 10.35.143.132:2181 --replication-factor 2 --partitions 3
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic test_cluster_topic.
bash-4.4# $KAFKA_HOME/bin/kafka-topics.sh --zookeeper 10.35.143.132:2181 --describe --topic test_cluster_topic
Topic: test_cluster_topic PartitionCount: 3 ReplicationFactor: 2  Configs: 
  Topic: test_cluster_topic Partition: 0  Leader: 1 Replicas: 1,0 Isr: 1,0
  Topic: test_cluster_topic Partition: 1  Leader: 2 Replicas: 2,1 Isr: 2,1
  Topic: test_cluster_topic Partition: 2  Leader: 0 Replicas: 0,2 Isr: 0,2
bash-4.4#      

创建名为test_cluster_topic 的topic指定zk地址、副本数2、分区数为3。

发送消息

bash-4.4#  $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 10.35.143.132:9092 --topic test_cluster_topic 
>123
>321
>^Cbash-4.4#      

使用kafka-console-producer.sh发送消息,指定broker的地址。

消费消息

bash-4.4# $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 10.35.143.132:9092 --topic test_cluster_topic --group testGroupId --from-beginning
123
321      

使用kafka-console-consumer.sh消费消息,指定broker的地址。

Kafka Eagle 安装

Kafka Eagle是kafka的web版的管理页面:​​javascript:void(0)​​

安装kafka-manager

下载镜像

docker pull sheepkiller/kafka-manager      

启动kafka-manager

docker run -d --name kafka-manager --link zookeeper:zookeeper --link kafka1:kafka1 -p 9001:9000 --restart=always --env ZK_HOSTS=zookeeper:2181 sheepkiller/kafka-manager      

访问kafka-manager

​​http://127.0.0.1:9001/​​

Kafka Server端核心配置

#broker 的全局唯一编号,不能重复 
broker.id=0
#服务端口,默认9092
port=9092
# 是否启用删除 topic 功能使能
delete.topic.enable=true
# 处理网络请求的线程数量 
num.network.threads=3
# 用来处理磁盘 IO 的线程数量 
num.io.threads=8
# 发送套接字的缓冲区大小 
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小 
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小 
socket.request.max.bytes=104857600
# kafka 运行日志存放的路径(也是消息的存放地址)
log.dirs=/opt/module/kafka/logs 
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量 
num.recovery.threads.per.data.dir=1 
# segment 文件保留的最长时间,超时将被删除(默认7天)
log.retention.hours=168
# 配置连接 Zookeeper 集群地址 
zookeeper.connect=zookeeper101:2181,zookeeper102:2181,zookeeper103:2181

# 对于segment日志的索引文件大小限制
log.index.size.max.bytes = 10 * 1024 * 1024
# 索引计算的一个缓冲区,一般不需要设置。
log.index.interval.bytes = 4096      

Kafka 客户端配置

# 配置的broker地址,不需要将cluster中所有的broker都配置上,因为启动后会自动的发现cluster所有的broker
bootstrap.servers=host1:port1;host2:port2…
# Message消息key和value的序列化方式
key.descrializer=
value.descrializer=
# 消费者组ID。默认值是 “”。
group.id=ConsumerGroupId
# Consumer每次调用poll()时取到的records的最大数。
max.poll.records=100
#配置consumer与coordinator之间的心跳间隔时间。心跳是确定consumer存活,加入或者退出group的有效手段。默认值是:3000 (3s)
# 这个值必须设置的小于session.timeout.ms,因为:当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。    
# 通常设置的值要低于session.timeout.ms的1/3。
heartbeat.interval.ms=3000
# Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。其默认值是:10000 (10 s)
·session.timeout.ms=10000
# Consumer 在commit offset时有两种模式:自动提交,手动提交。自动提交:是Kafka Consumer会在后台周期性的去commit。默认值是true。
enable.auto.commit=true
# 自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)
auto.commit.interval.ms=5000
#  Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:
# 1. earliest:自动重置到最早的offset。
# 2. latest:看上去重置到最晚的offset。
# 3. none:如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
# 4. 如果不是上述3种,只抛出异常给consumer。
auto.offset.reset=latest
# consumer到broker之间的连接空闲超时时间
connections.max.idle.ms=540000
# Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。
fetch.max.wait.ms=
# 当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。取值范围是:[0, Integer.Max],默认值是1。
fetch.min.bytes=1
# 一次fetch请求,从一个broker中取得的records最大大小。如果单条消息就超过了这个限制,那么一次就只返回一条消息。取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)
fetch.max.bytes=52428800
# 一次fetch请求,从一个partition中取得的records最大大小。如果单条消息就超过了这个限制,那么一次就只返回一条消息。
# broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。
max.partition.fetch.bytes=52428800
# 前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。
max.poll.interval.ms=
# 接收套接字的缓冲区大小, 默认值是:65536 (64 KB),如果值设置为-1,则会使用操作系统默认的值。
receive.buffer.byte=65536
# 请求发起后,并不一定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:305000 (305 s)
request.timeout.ms=305000
# Consumer进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。
client.id=xxx
# Producer拦截器(interceptor),interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。
interceptor.classes=
# Metadata数据的刷新间隔。即便没有任何的partition订阅关系变更也行执行。默认5分钟
metadata.max.age.ms=300000      
上一篇: ios版本号