天天看点

KAFKA集群搭建

环境:

CentOS 6.5

KAFKA版本: kafka_2.11-0.8.2.1

ZOOKEEPER版本: zookeeper-3.4.6

JDK: 1.8.0_151

SERVER: 172.16.2.27、172.16.2.28、172.16.2.29

一、准备:

1、安装JDK1.8

2、下载kafka和zookeeper安装包(二进制包)

下载地址:

二、安装zookeeper集群

1

2

3

4

5

6

7

8

9

10

11

12

<code>##将安装文件解压缩到/usr/local目录</code>

<code>tar</code> <code>zxf zookeeper-3.4.6.</code><code>tar</code><code>.gz -C </code><code>/usr/local</code>

<code>##创建软连接</code>

<code>ln</code> <code>-s </code><code>/usr/local/zookeeper-3</code><code>.4.6  </code><code>/usr/local/zookeeper</code>

<code>##从模板复制配置文件</code>

<code>cd</code> <code>/usr/local/zookeeper/conf</code>

<code>cp</code> <code>zoo_sample.cfg  zoo.cfg</code>

<code>##创建data目录</code>

<code>mkdir</code> <code>-p </code><code>/data/zookeeper/</code><code>{zkdata,zkdatalog}</code>

修改配置文件

13

14

15

16

<code>cat</code> <code>zoo.cfg | </code><code>egrep</code> <code>-</code><code>v</code> <code>"^#|^$"</code>

<code>tickTime=2000  </code>

<code>initLimit=10</code>

<code>syncLimit=5</code>

<code>dataDir=</code><code>/data/zookeeper/zkdata</code>

<code>dataLogDir=</code><code>/data/zookeeper/zkdatalog</code>

<code>clientPort=2181</code>

<code>server.1=172.16.2.27:2888:3888</code>

<code>server.2=172.16.2.28:2888:3888</code>

<code>server.3=172.16.2.29:2888:3888</code>

<code>autopurge.snapRetainCount=30</code>

<code>autopurge.purgeInterval=24</code>

<code>###server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器</code>

<code>这个标识要写到快照目录下面myid文件里</code>

<code>#172.16.2.27为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,</code>

<code>第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888</code>

三台服务器上的配置是一样的

配置文件解释:

<code>#tickTime:</code>

<code>这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。</code>

<code>#initLimit:</code>

<code>这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。</code>

<code>当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒</code>

<code>#syncLimit:</code>

<code>这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒</code>

<code>#dataDir:</code>

<code>快照日志的存储路径</code>

<code>#dataLogDir:</code>

<code>事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多</code>

<code>#clientPort:</code>

<code>这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点</code>

创建myid文件

<code>#server1</code>

<code>echo</code> <code>"1"</code> <code>&gt; </code><code>/data/zookeeper/zkdata/myid</code>

<code>#server2</code>

<code>echo</code> <code>"2"</code> <code>&gt; </code><code>/data/zookeeper/zkdata/myid</code>

<code>#server3</code>

<code>echo</code> <code>"3"</code> <code>&gt;</code><code>/data/zookeeper/zkdata/myid</code>

重要说明:

1、myid文件和server.myid  在快照目录下存放的标识本台服务器的文件,他是整个zk集群用来发现彼此的一个重要标识。

2、zoo.cfg 文件是zookeeper配置文件 在conf目录里。

3、log4j.properties文件是zk的日志输出文件 在conf目录里用java写的程序基本上有个共同点日志都用log4j,来进行管理。

4、zkEnv.sh和zkServer.sh文件

zkServer.sh 主的管理程序文件

zkEnv.sh 是主要配置,zookeeper集群启动时配置环境变量的文件

5、还有一个需要注意

ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator

zookeeper不会主动的清除旧的快照和日志文件,这个是操作者的责任。

启动ZooKeeper服务

<code>cd</code> <code>/usr/local/zookeeper/bin</code>

<code>.</code><code>/zkServer</code><code>.sh start</code>

<code>##查看状态</code>

<code>.</code><code>/zkServer</code><code>.sh status</code>

<code>JMX enabled by default</code>

<code>Using config: </code><code>/usr/local/zookeeper/bin/</code><code>..</code><code>/conf/zoo</code><code>.cfg  </code><code>##zookeeper使用的配置文件</code>

<code>Mode: follower  </code><code>##zookeeper的角色</code>

三、安装KAFKA集群

<code>tar</code> <code>zxf kafka_2.11-0.8.2.1.tgz -C </code><code>/usr/local</code>

<code>ln</code> <code>-s </code><code>/usr/local/kafka_2</code><code>.11-0.8.2.1  </code><code>/usr/local/kafka</code>

<code>cd</code> <code>/usr/local/kafka</code>

<code>cp</code> <code>server.properties server.properties.bak</code>

17

18

<code>broker.</code><code>id</code><code>=0  </code><code>#当前机器在集群中的唯一标识,和zookeeper的myid性质一样</code>

<code>port=19092 </code><code>#当前kafka对外提供服务的端口默认是9092</code>

<code>host.name=172.16.2.27 </code><code>#这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。</code>

<code>num.network.threads=3 </code><code>#这个是borker进行网络处理的线程数</code>

<code>num.io.threads=8 </code><code>#这个是borker进行I/O处理的线程数</code>

<code>log.</code><code>dirs</code><code>=log.</code><code>dirs</code><code>=</code><code>/data/kafka/kafka-logs</code> <code>#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个</code>

<code>socket.send.buffer.bytes=102400 </code><code>#发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能</code>

<code>socket.receive.buffer.bytes=102400 </code><code>#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘</code>

<code>socket.request.max.bytes=104857600 </code><code>#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小</code>

<code>num.partitions=1 </code><code>#默认的分区数,一个topic默认1个分区数</code>

<code>log.retention.hours=168 </code><code>#默认消息的最大持久化时间,168小时,7天</code>

<code>message.max.byte=5242880  </code><code>#消息保存的最大值5M</code>

<code>default.replication.factor=2  </code><code>#kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务</code>

<code>replica.fetch.max.bytes=5242880  </code><code>#取消息的最大直接数</code>

<code>log.segment.bytes=1073741824 </code><code>#这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件</code>

<code>log.retention.check.interval.ms=300000 </code><code>#每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除</code>

<code>log.cleaner.</code><code>enable</code><code>=</code><code>false</code> <code>#是否启用log压缩,一般不用启用,启用的话可以提高性能</code>

<code>zookeeper.connect=172.16.2.27:2181,172.16.2.28:2181,172.16.2.29:2181 </code><code>#设置zookeeper的连接端口</code>

<code>broker.</code><code>id</code><code>=0  每台服务器的broker.</code><code>id</code><code>都不能相同</code>

<code>host.name=172.16.2.27</code>

<code>#在log.retention.hours=168 下面新增下面三项</code>

<code>message.max.byte=5242880</code>

<code>default.replication.factor=2</code>

<code>replica.fetch.max.bytes=5242880</code>

<code>#设置zookeeper的连接端口</code>

<code>zookeeper.connect=172.16.2.27:2181,172.16.2.28:2181,172.16.2.29:2181</code>

启动kafka服务并测试:

<code>#从后台启动Kafka集群(3台都需要启动)</code>

<code>cd</code> <code>/usr/local/kafka/bin/</code> <code>#进入到kafka的bin目录 </code>

<code>.</code><code>/kafka-server-start</code><code>.sh -daemon ..</code><code>/config/server</code><code>.properties</code>

检查kafka服务是否启动

<code># jps</code>

<code>67137 Kafka</code>

<code>111089 ProdServerStart</code>

<code>122216 Jps</code>

<code>114635 QuorumPeerMain</code>

三台服务器上配置基本一样,除了下面两条配置不同

<code>broker.</code><code>id</code><code>=0</code>

<code></code>

本文转自 曾哥最爱 51CTO博客,原文链接:http://blog.51cto.com/zengestudy/2049745,如需转载请自行联系原作者

上一篇: 杂乱笔记