Kafka
文章目录
- Kafka
- 1、安装
-
- 集群规划
- 安装
- 2、常用命令
- 3、API
-
- 环境配置
- 生产者(Producer)
-
- 不带回调函数
- 带回调函数
- 自定义分区(Partition)
- 消费者(Consumer)
-
- 自动提交 offset
- 手动提交 offset
-
- 异步提交
- 同步提交
- 自定义拦截器(Interceptor)
-
- 原理
- 案例
- 4、Kafka监控(Eagle)
- 5、Kafka对接Flume
1、安装
集群规划
hadoop151 | hadoop152 | hadoop153 |
---|---|---|
zookeeper | zookeeper | zookeeper |
kafka | kafka | kafka |
安装
- 第一步:将 kafka_2.11-0.11.0.0.tgz 上传到服务器并解压
- 第二步:在kafka目录下创建data目录
- 第三步:修改 kafka 目录下的 config 目录下的 server.properties 文件
#broker 的全局唯一编号,不能重复 broker.id=1 #删除 topic 功能使能 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的现成数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志存放的路径 log.dirs=/opt/module/kafka/logs #topic 在当前 broker 上的分区个数 num.partitions=1 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 #segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #配置连接 Zookeeper 集群地址 zookeeper.connect=hadoop151:2181,hadoop152:2181,hadoop153:2181
- 第四步:将kafka整个文件分发到另外两台服务器上
- 第五步:将 hadoop152和hadoop153 服务器中的 server.properties 文件中的 broker.id 参数分别修改成 2和3
- 第六步:将三台服务器上的 zookeeper 启动
- 第七步:将三台服务器上的 kafka 启动
bin/kafka-server-start.sh -daemon server.properties文件目录及文件名 #事例 bin/kafka-server-start.sh -daemon config/server.properties
2、常用命令
- 查看当前服务器中的所有 topic(–list)
bin/kafka-topics.sh --bootstrap-server [主机名:9092](集群) --list # 事例 bin/kafka-topics.sh --bootstrap-server hadoop151:9092,hadoop152:9092,hadoop153:9092 --list
- 创建 topic(–create)
# --topic : topic名称 # --partitions : 分区数 # --replication-factor : 副本数 bin/kafka-topics.sh --zookeeper [zookeeper的主机名:端口号](集群) \ --topic topic名称 --partitions 分区数 --replication-factor 副本数 --create # 事例 bin/kafka-topics.sh \ --zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 \ --topic fzk --partitions 2 --replication-factor 2 --create
- 删除 topic(–delete)
bin/kafka-topics.sh --zookeeper [zookeeper的主机名:端口号](集群) --topic topic名称 --delete # 事例 bin/kafka-topics.sh --zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 --topic fzk --delete
- 查看某个 Topic 的详情(–describe)
bin/kafka-topics.sh --zookeeper [zookeeper的主机名:端口号](集群) --topic topic名称 --describe # 事例 bin/kafka-topics.sh --zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 \ --topic fzk --describe
- 修改分区数(–alter)
bin/kafka-topics.sh --zookeeper [zookeeper的主机名:端口号](集群) \ --topic topic名称 --partitions 修改后的分区数 --alter # 事例 bin/kafka-topics.sh --zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 \ --topic fzk --partitions 6 --alter
- 发送消息
bin/kafka-console-producer.sh --broker-list [主机名:9092](集群) --topic topic名称 # 事例 bin/kafka-console-producer.sh \ --broker-list hadoop151:9092,hadoop152:9092,hadoop153:9092 \ --topic fzk
- 消费消息
################# 第一种(推荐) bin/kafka-console-consumer.sh --bootstrap-server [主机名:9092](集群) --topic topic名称 # 事例 bin/kafka-console-consumer.sh \ --bootstrap-server hadoop151:9092,hadoop152:9092,hadoop153:9092 \ --topic fzk ################# 第二种 bin/kafka-console-consumer.sh --zookeeper 主机名:2181 --topic topic名称 # 事例 bin/kafka-console-consumer.sh \ --zookeeper hadoop151:2181,hadoop152:2181,hadoop153:2181 \ --topic fzk
3、API
环境配置
- 创建Maven工程,引入依赖(pom.xml)
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> </dependencies>
生产者(Producer)
不带回调函数
- 使用到的类的说明
- KafkaProducer:需要创建一个生产者对象,用来发送数据
- ProducerConfig:获取所需的一系列配置参数
- ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args) {
//1、编写配置
Properties properties = new Properties();
//kafka 集群,broker-list
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
//ack应答级别
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//等待时间
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//RecordAccumulator 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//key值序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//value值序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//2、创建生产者(producer)
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
//3、发送消息
producer.send(new ProducerRecord<String, String>("fzk", "fzk -- " + i));
}
//4、关闭资源
if(producer != null){
producer.close();
}
}
}
带回调函数
- 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CallbackProducer {
public static void main(String[] args) {
//1、配置参数
Properties properties = new Properties();
//kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
//key值序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//value值序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//2、创建生产者(Producer)
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 0; i < 10; i++) {
//3、发送消息(带回调函数,Callback)
producer.send(new ProducerRecord<String, String>("fzk", "fzk -- " + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
//输出回调信息,(RecordMetadata:回调函数的信息)
System.out.println(recordMetadata.partition() + " --- " + recordMetadata.offset());
}
}
});
}
//4、关闭资源
if (producer != null){
producer.close();
}
}
}
自定义分区(Partition)
- 第一步:自定义分区类
package com.itfzk.kafka.partitioner; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class MyPartitioner implements Partitioner { //分区方法 public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { //分区逻辑 //返回分区数 return 分区数; } //关闭 public void close() { } //配置信息 public void configure(Map<String, ?> map) { } }
- 第二步:使用自定义分区
- 在 properties 中配置(ProducerConfig.PARTITIONER_CLASS_CONFIG)
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class PartitionerProducer { public static void main(String[] args) { //1、配置参数 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //使用自定义分区,value值:自定义分区类的全限定类名 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.itfzk.kafka.partitioner.MyPartitioner"); //2、创建生产者(Producer) KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 10; i++) { //3、发送消息(带回调函数) producer.send(new ProducerRecord<String, String>("fzk", "fzk -- " + i), new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null){ //输出回调信息,(RecordMetadata:回调函数的信息) System.out.println(recordMetadata.partition() + " --- " + recordMetadata.offset()); } } }); } //4、关闭资源 if (producer != null){ producer.close(); } } }
消费者(Consumer)
自动提交 offset
- 需要用到的类
- KafkaConsumer:需要创建一个消费者对象,用来消费数据
- ConsumerConfig:获取所需的一系列配置参数
- ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class MyConsumer {
public static void main(String[] args) {
//1、配置信息
Properties properties = new Properties();
//kafka集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
//是否开启自动提交 offset 功能
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//自动提交 offset 的时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//key值的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//key值的反序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//2、创建消费者对象(Consumer)
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//3、订阅主题,可订阅多个
consumer.subscribe(Arrays.asList("fff", "zzz", "kkk"));
while (true){
//4、拉取订阅的消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
//打印拉取到的消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + " --- " + consumerRecord.value());
}
}
}
}
手动提交 offset
异步提交
- 关闭自动提交 offset 功能:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- 异步提交:
consumer.commitAsync()
package com.itfzk.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class ASyncCommitConsumer {
public static void main(String[] args) {
//1、配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//关闭自动提交 offset 功能
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//2、创建消费者对象(Consumer)
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//3、订阅主题,可订阅多个
consumer.subscribe(Arrays.asList("fff", "zzz", "kkk"));
while (true){
//4、拉取订阅的消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
//打印拉取到的消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + " --- " + consumerRecord.value());
}
//5、异步提交
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null){
System.out.println("提交成功 -- " + map);
}
}
});
}
}
}
同步提交
- 关闭自动提交 offset 功能:
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- 同步提交:
consumer.commitSync();
package com.itfzk.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class SyncCommitConsumer {
public static void main(String[] args) {
//1、配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group_test");
//关闭自动提交 offset 功能
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//2、创建消费者对象(Consumer)
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//3、订阅主题,可订阅多个
consumer.subscribe(Arrays.asList("fff", "zzz", "kkk"));
while (true){
//4、拉取订阅的消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
//打印拉取到的消息
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key() + " --- " + consumerRecord.value());
}
//5、同步提交,当前线程会阻塞直到 offset 提交成功
consumer.commitSync();
}
}
}
自定义拦截器(Interceptor)
原理
- interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个 interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)
- 实现接口
-
,其定义的方法包括:org.apache.kafka.clients.producer.ProducerInterceptor
- configure(Map<String, ?> map)
- 获取配置信息和初始化数据时调用
- onSend(ProducerRecord<String, String> producerRecord)
- 该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法
- onAcknowledgement(RecordMetadata recordMetadata, Exception e)
- 该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢 producer 的消息发送效率。
- close()
- 关闭 interceptor,主要用于执行一些资源清理工作如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意
-
案例
需求
- 实现一个简单的双 interceptor 组成的拦截链。
- 第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;
- 第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数
实现
- 添加时间戳(拦截器)
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String, String> { //获取配置信息和初始化数据时调用 @Override public void configure(Map<String, ?> map) { } //该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { //在消息(value)前面添加时间戳 ProducerRecord<String, String> record = new ProducerRecord<String, String>( producerRecord.topic(), producerRecord.partition(), producerRecord.key(), System.currentTimeMillis() + " -- " + producerRecord.value()); return record; } //该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用 @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } //关闭 interceptor,主要用于执行一些资源清理工作 @Override public void close() { } }
- 统计发送成功及失败的数量并打印(拦截器)
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CountInterceptor implements ProducerInterceptor<String, String> { private int successCount = 0; private int errorCount = 0; //获取配置信息和初始化数据时调用 @Override public void configure(Map<String, ?> map) { } //该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { return producerRecord; } //该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程中失败时调用 @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { //统计发送成功及失败的数量 if(e == null){ successCount++; }else{ errorCount++; } } //关闭 interceptor,主要用于执行一些资源清理工作 @Override public void close() { //打印发送成功及失败的数量 System.out.println("成功数量:" + successCount); System.out.println("失败数量:" + errorCount); } }
- 生产者
import org.apache.kafka.clients.producer.*; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class InterceptorProducer { public static void main(String[] args) { //1、配置参数 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop151:9092,hadoop152:9092,hadoop153:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //自定义拦截器配置(两个拦截器组成拦截器链) List<String> interceptorList = new ArrayList<String>(); interceptorList.add("com.itfzk.kafka.interceptor.TimeInterceptor"); interceptorList.add("com.itfzk.kafka.interceptor.CountInterceptor"); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorList); //2、创建生产者(Producer) KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 10; i++) { //3、发送消息 producer.send(new ProducerRecord<String, String>("fzk", "fzk -- " + i)); } //4、关闭资源 if (producer != null){ producer.close(); } } }
4、Kafka监控(Eagle)
安装
- 第一步:修改 kafka 启动命令(kafka-server-start.sh)
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export JMX_PORT="9999" export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
- 第二步:将 kafka-server-start.sh 分发到其他服务器
- 第三步:将 kafka-eagle-bin-1.3.7.tar.gz 上传到服务器并解压
- 第四步:进入到 eagle的bin目录,给启动文件(ke.sh)执行权限
-
chmod 777 ke.sh
-
- 第五步:修改配置文件(eagle的conf目录下的 system-config.properties)
###################################### # multi zookeeper&kafka cluster list ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop151:2181,hadoop152:2181,hadoop153:2181 ###################################### # kafka offset storage ###################################### cluster1.kafka.eagle.offset.storage=kafka ###################################### # enable kafka metrics ###################################### kafka.eagle.metrics.charts=true kafka.eagle.sql.fix.error=false ###################################### # kafka jdbc driver address ###################################### kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://hadoop151:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456
- 第六步:添加环境变量,并
source /etc/profile
# KE_HOME:kafka-eagle export KE_HOME=/opt/software/eagle/kafka-eagle-web-1.3.7 export PATH=$PATH:$KE_HOME/bin
- 第七步:启动Zookeeper和Kafka
- 第八步:启动Eagle
-
bin/ke.sh start
-
- 第九步:登录页面查看监控数据(用户名和密码:启动Eagle时显示)
-
http://192.168.9.102:8048/ke
-
5、Kafka对接Flume
- 第一步:编写Flume配置文件(flume-kafka.conf)
#name a1.sources = r1 a1.sinks = k1 a1.channels = c1 #source配置 a1.sources.r1.type = netcat a1.sources.r1.bind = hadoop151 a1.sources.r1.port = 44444 #sink配置 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = fzk a1.sinks.k1.kafka.bootstrap.servers = hadoop151:9092,hadoop152:9092,hadoop153:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 #channel配置 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #source-channel-sink之间的联系 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 第二步:开启kafka消费者
bin/kafka-console-consumer.sh --bootstrap-server hadoop151:9092,hadoop152:9092,hadoop153:9092 \ --topic fzk
- 第三步:启动Flume
bin/flume-ng agent -n agent名称 -c conf/ -f flume-kafka.conf文件路径及文件名 #执行 bin/flume-ng agent -n a1 -c conf/ -f job/flume-kafka.conf