文章目录
-
-
- 一 sparksql练习
-
- 1.1数据准备
-
- 1.1.1 employee.json
- 1.1.2 employee2.json
- 1.1.3 department
- 1.2 sparkSQL-->typed
-
- 1.2.1 强类型练习
- 1.2.1 输出结果
- 1.3 SparkSQL常用方法
-
- 1.3.1 实例代码
- 1.3.2 输出结果
- 1.4 自定义UDF, 实现统计字符串的长度
-
- 1.4.1 自定义UDF需要继承udx[1,2,3...22], 可以传入最多22个参数
- 1.4.2 输出结果
- 1.5 UDAF实现Wordcount
-
- 1.5.1 自定义UDAF
- 1.5.2 调用自定义的UDF
- 1.5.3 输出结果
- 二 kafka
-
- 2.1 kafka简介
- 2.2 Kafka核心组件(重要)
- 2.3 Kafka整体结构图
- 2.4 Kafka名词解释和工作方式
- 2.5 Consumer与topic关系
- 2.6 Kafka消息的分发
-
- 2.6.1 Producer客户端负责消息的分发
- 2.6.2 Producer消息发送的应答机制
- 2.7 Consumer的负载均衡
- 三 Kafka集群部署
-
- 3.1 集群部署的基本流程
- 3.2 集群部署的基础环境准备
- 3.3 Kafka集群部署
-
- 3.3.1 下载安装包
- 3.3.2 解压安装包
- 3.3.3 修改配置文件
-
- 3.3.3.1修改server.properties
- 3.3.3.2 修改producer.properties
- 3.3.3.3 consumer.properties
- 3.4 分发安装包
- 3.5 启动集群
-
一 sparksql练习
1.1数据准备
1.1.1 employee.json
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
{"name": "Jen", "age": 19, "depId": 2, "gender": "male", "salary": 8000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "XiaoFang", "age": 18, "depId": 3, "gender": "female", "salary": 58000}
1.1.2 employee2.json
{"name": "Leo", "age": 25, "depId": 1, "gender": "male", "salary": 20000.123}
{"name": "Marry", "age": 30, "depId": 2, "gender": "female", "salary": 25000}
{"name": "Jack", "age": 35, "depId": 1, "gender": "male", "salary": 15000}
{"name": "Tom", "age": 42, "depId": 3, "gender": "male", "salary": 18000}
{"name": "Kattie", "age": 21, "depId": 3, "gender": "female", "salary": 21000}
{"name": "Jen", "age": 30, "depId": 2, "gender": "female", "salary": 28000}
1.1.3 department
{"id": 1, "name": "Technical Department"}
{"id": 2, "name": "Financial Department"}
{"id": 3, "name": "HR Department"}
1.2 sparkSQL–>typed
1.2.1 强类型练习
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/*
* @Description: Typed
* ClassName TypedDemo
* @Author: WCH
* @CreateDate: 2019/1/8$ 9:52$
* @Version: 1.0
*/
object TypedDemo {
def main(args: Array[String]): Unit = {
val ss: SparkSession = SparkSession.builder().appName("TypedDemo").master("local[2]").getOrCreate()
import ss.implicits._
import org.apache.spark.sql.functions._
val employee: DataFrame = ss.read.json("F:\\IDEAWorkspace\\Spark00\\src\\main\\scala\\day06\\sparkSql\\employee.json")
val empDS: Dataset[Employee] = employee.as[Employee]
val employee2: DataFrame = ss.read.json("F:\\IDEAWorkspace\\Spark00\\src\\main\\scala\\day06\\sparkSql\\employee2.json")
val empDS2: Dataset[Employee] = employee2.as[Employee]
val department: DataFrame = ss.read.json("F:\\IDEAWorkspace\\Spark00\\src\\main\\scala\\day06\\sparkSql\\department.json")
val depDs: Dataset[Department] = department.as[Department]
// 打印分区数
println("初始分区数为: " + employee.rdd.partitions.size)
// 重分区, repartition默认为shuffle, coalese默认shuffle为false
val empRepart: Dataset[Row] = employee.repartition(8)
println("repartition分区数为: " + empRepart.rdd.partitions.size)
val empCoalese: Dataset[Row] = employee.coalesce(5)
println("coalese分区数为: " + empCoalese.rdd.partitions.size)
// distinct, dropDuplicates
// distinct是按照整条数据进行完整比对进行去重
// dropDuplicates是根据指定的一个或多个字段进行对比去重
employee.distinct().show()
employee.dropDuplicates(Array("name")).show()
// filter, except
// excep获取当前DataSet中有但是另一个DataSet中没有的元素, 相当于差集
// filter, 需要给一个过滤逻辑, 如果返回true, 则保留该元素, 否则删除该元素
empDS.except(empDS2).show() // 差集
empDS.union(empDS2).show() // 并集
empDS.intersect(empDS2).show() // 交集
empDS.filter(emp => emp.age > 30).show()
// joinWith: 将两个数据集join到一起, 需要指定join字段
empDS.joinWith(depDs, $"depId" === $"id").show()
// sort: 指定字段进行排序
empDS.sort($"age".desc).show()
// collect_set, collect_list
// collect_set: 将指定字段的值都收集到一起, 会按照这个字段进行去重
// collect_list: 同上, 但是不会按照字段进行去重
empDS.groupBy(empDS("depId"))
.agg(collect_set(empDS("name")), collect_list(empDS("name")))
.show()
// avg, sum max min count countDistinct
employee
.join(department, $"depId" === $"id")
.groupBy(department("name"))
.agg(avg(employee("salary")),
sum(employee("salary")),
max(employee("salary")), min(employee("salary")),
count(employee("name")),
countDistinct(employee("name"))).show()
// untyped: select where join groupBy agg
ss.stop()
}
}
case class Employee(name: String, age: Long, depId: Long, gender: String, salary: Double)
case class Department(id: Long, name: String)
1.2.1 输出结果
repartition默认为shuffle, coalese默认shuffle为false, 所以分区不能改变
distinct是按照整条数据进行完整比对进行去重
dropDuplicates是根据指定的一个或多个字段进行对比去重
差集
并集
交集
取出年龄大于30
joinWith: 将两个数据集join到一起
sort: 指定字段进行排序
collect_set: 将指定字段的值都收集到一起, 会按照这个字段进行去重
collect_list: 将指定字段的值都收集到一起, 但是不会按照字段进行去重
使用avg, sum max min count countDistinct方法
1.3 SparkSQL常用方法
1.3.1 实例代码
import org.apache.spark.sql.{DataFrame, SparkSession}
/*
* @Description: 日期方法: current_date, current_timestamp
* 数学方法: round---保留几位小数
* 随机方法: rand
* 字符串方法: concat, concat_ws
* ClassName OtherFuncDemo
* @Author: WCH
* @CreateDate: 2019/1/8$ 11:16$
* @Version: 1.0
*/
object OtherFuncDemo {
def main(args: Array[String]): Unit = {
val ss: SparkSession = SparkSession.builder()
.appName("OtherFuncDemo")
.master("local[2]")
.getOrCreate()
import ss.implicits._
import org.apache.spark.sql.functions._
val employee: DataFrame = ss.read.json("F:\\IDEAWorkspace\\Spark00\\src\\main\\scala\\day06\\sparkSql\\employee.json")
employee
.select(employee("name"),
current_date(),
current_timestamp(),
round(employee("salary"), 2),
rand(),
concat(employee("name"), employee("age")),
concat_ws("|", employee("name"), employee("age"))
).show()
ss.stop()
}
}
1.3.2 输出结果
1.4 自定义UDF, 实现统计字符串的长度
1.4.1 自定义UDF需要继承udx[1,2,3…22], 可以传入最多22个参数
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/*
* @Description: 自定义UDF, 实现统计字符串的长度
* ClassName UDFDemo
* @Author: WCH
* @CreateDate: 2019/1/8$ 11:36$
* @Version: 1.0
*/
object UDFDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("UDFDemo").setMaster("local[2]")
val ss: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// 模拟数据
val names = List("tom","jerry","shuke")
val namesDF: DataFrame = ss.createDataFrame(names.map(Person))
// 注册自定义UDF
ss.udf.register("strlen",(name:String)=>name.length)
// 注册临时视图
namesDF.createOrReplaceTempView("t_person")
// 调用UDF进行查询
val res: DataFrame = ss.sql("select name, strlen(name) from t_person")
res.show()
ss.stop()
}
}
case class Person(name:String)
1.4.2 输出结果
1.5 UDAF实现Wordcount
1.5.1 自定义UDAF
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
/*
* @Description: StringCount
* ClassName StringCount
* @Author: WCH
* @CreateDate: 2019/1/8$ 11:55$
* @Version: 1.0
*/
// udaf is used define aggreagate funtions for user
class WCUDAF extends UserDefinedAggregateFunction{
// 定义输入类型
override def inputSchema: StructType = {
StructType(Array(StructField("str",StringType,true)))
}
// 定义缓冲类型
override def bufferSchema: StructType = {
StructType(Array(StructField("count",IntegerType,true)))
}
// 定义输出类型, 即返回值类型
override def dataType: DataType = IntegerType
// 是否是确定的, 如果为true, 就代表返回值类型和输入值类型是相同的
override def deterministic: Boolean = true
// 初始化操作方法, 可以在这个方法中进行初始化值
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0
}
// 局部聚合(分区内聚合) buffer: 之前的值 input: 传进来的新值
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0)+1
}
// 全局聚合 buffer1和buffer2 代表各分区聚合的结果, 将全分区的结果进行聚合
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0)=buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
}
// 在这个buffer中可以进行其他的操作, 比如给他加某个值或者给他截取等操作
override def evaluate(buffer: Row): Any = buffer.get(0)
}
1.5.2 调用自定义的UDF
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
/*
* @Description: UDAF
* ClassName UDAFDemo
* @Author: WCH
* @CreateDate: 2019/1/8$ 11:50$
* @Version: 1.0
*/
object UDAFDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("UDFDemo").setMaster("local[2]")
val ss: SparkSession = SparkSession.builder().config(conf).getOrCreate()
// mock data
val names = Array("dazhao","yadong","dazhao","yadong","xiaodong","dazhao")
val namesDF: DataFrame = ss.createDataFrame(names.map(Person))
// register tmp table
namesDF.createOrReplaceTempView("t_person")
// register udaf
ss.udf.register("wc",new WCUDAF)
// start select
val res: DataFrame = ss.sql("select name , wc(name) from t_person group by name")
res.show
ss.stop()
}
}
1.5.3 输出结果
二 kafka
在流式计算中,Kafka一般用来缓存数据,Storm通过消费Kafka的数据进行计算。
2.1 kafka简介
- Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。
- Kafka最初是由LinkedIn开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高吞吐量、低等待的平台。
- Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。
- Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。
- 无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性
2.2 Kafka核心组件(重要)
- Topic :消息根据Topic进行归类
- Producer:发送消息者
- Consumer:消息接受者
- broker:每个kafka实例(server)
- Zookeeper:依赖集群保存meta信息。
2.3 Kafka整体结构图
2.4 Kafka名词解释和工作方式
- Producer :消息生产者,就是向kafka broker发消息的客户端。
- Consumer :消息消费者,向kafka broker取消息的客户端
- Topic :我们可以理解为一个队列。
- Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
- Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
- Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
- Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
2.5 Consumer与topic关系
本质上kafka只支持Topic;
-
每个group中可以有多个consumer,每个consumer属于一个consumer group;
通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高"故障容错"性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。
-
对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer;
那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个"订阅"者。
-
在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻);
一个Topic中的每个partition,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以同时消费多个partition中的消息。
-
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partition个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。
2.6 Kafka消息的分发
2.6.1 Producer客户端负责消息的分发
- kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息;
- 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;
-
消息由producer直接通过socket发送到broker,中间不会经过任何"路由层",事实上,消息被路由到哪个partition上由producer客户端决定;
比如可以采用"random"“key-hash”"轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的。
- 在producer端的配置文件中,开发者可以指定partition路由的方式。
2.6.2 Producer消息发送的应答机制
设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0: producer不会等待broker发送ack
1: 当leader接收到消息之后发送ack
-1: 当所有的follower都同步消息成功后发送ack
request.required.acks=0
2.7 Consumer的负载均衡
当一个group中,有consumer加入或者离开时,会触发partitions负载均衡.负载均衡的最终目的,是提升topic的并发消费能力,步骤如下:
- 假如topic1,具有如下partitions: P0,P1,P2,P3
- 加入group中,有如下consumer: C1,C2
- 首先根据partition索引号对partitions排序: P0,P1,P2,P3
- 根据consumer.id排序: C1,C2
- 计算倍数: M = [P0,P1,P2,P3].size / [C1,C2].size,本例值M=2(向上取整)
- 然后依次分配partitions: C1 = [P0,P3],C2=[P1,P2],即Ci = [P(i * M),P((i + 1) * M -1)]
三 Kafka集群部署
3.1 集群部署的基本流程
下载安装包、解压安装包、修改配置文件、分发安装包、启动集群
3.2 集群部署的基础环境准备
安装前的准备工作(zookeeper集群已经部署完毕)
关闭防火墙
service iptables stop
chkconfig iptables off
3.3 Kafka集群部署
3.3.1 下载安装包
http://kafka.apache.org/downloads.html
3.3.2 解压安装包
tar -zxvf kafka_2.11-1.1.0.tgz -C /opt/app/
cd /opt/app/
3.3.3 修改配置文件
cp /opt/app//kafka/config/server.properties /opt/app/kafka/config/server.properties.bak
cp /opt/app//kafka/config/producer.properties /opt/app/kafka/config/producer.properties.bak
cp /opt/app//kafka/config/consumer.properties /opt/app/kafka/config/consumer.properties.bak
vi /opt/app/kafka/config/server.properties
vi /opt/app/kafka/config/producer.properties
vi /opt/app/kafka/config/consumer.properties
3.3.3.1修改server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka消息存放的路径
log.dirs=/export/servers/logs/kafka
#topic在当前broker上的分片个数
num.partitions=2
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#滚动生成新的segment文件的最大时间
log.roll.hours=168
#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824
#周期性检查文件大小的时间
log.retention.check.interval.ms=300000
#日志清理是否打开
log.cleaner.enable=true
#broker需要使用zookeeper保存meta数据
zookeeper.connect=node01:2181,node02:2181,node03:2181
#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000
#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000
#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000
#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=kafka01
3.3.3.2 修改producer.properties
#指定kafka节点列表,用于获取metadata,不必全部指定
metadata.broker.list=kafka01:9092,kafka02:9092
# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
# partitioner.class=kafka.producer.DefaultPartitioner
# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
compression.codec=none
# 指定序列化处理类
serializer.class=kafka.serializer.DefaultEncoder
# 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
#compressed.topics=
# 设置发送数据是否需要服务端的反馈,有三个值0,1,-1
# 0: producer不会等待broker发送ack
# 1: 当leader接收到消息之后发送ack
# -1: 当所有的follower都同步消息成功后发送ack.
request.required.acks=0
# 在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000
# 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
# 也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync
# 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
# 此值和batch.num.messages协同工作.
queue.buffering.max.ms = 5000
# 在async模式下,producer端允许buffer的最大消息量
# 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
# 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
queue.buffering.max.messages=20000
# 如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500
# 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后
# 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)
# 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间
# -1: 无阻塞超时限制,消息不会被抛弃
# 0:立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-1
# 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
# 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
# 有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=3
# producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况
# 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新
# (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000
topic.metadata.refresh.interval.ms=60000
3.3.3.3 consumer.properties
# zookeeper连接服务器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
# zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.session.timeout.ms=5000
#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=10000
# 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=2000
#指定消费组
group.id=group01
# 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
# 注意offset信息并不是每消费一次消息就向zk提交一次,而是先在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true
# 自动更新时间。默认60 * 1000
auto.commit.interval.ms=1000
# 当前consumer的标识,可以设定,也可以由系统生成,主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx
# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx
# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=50
# 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数.
rebalance.max.retries=5
# 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=6553600
# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
auto.offset.reset=smallest
# 指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder
3.4 分发安装包
cd /opt/app
scp -r kafka_2.11-0.8.2.2 kafka02:$PWD
3.5 启动集群
依次在各节点上启动kafka
nohup bin/kafka-server-start.sh config/server.properties &