天天看点

超详细的Kafka基础

Kafka基础

zookeeper和bootstrap-server测试案例,下午详细介绍。

测试案例(zk:2181)(b-s:9092) 结果
whitelist zookeeper 正确
whitelist bootstrap-server 正确
blacklist zookeeper 正确
blacklist bootstrap-server 需指定white,white在后指定
blacklist zookeeper 正确
blacklist bootstrap-server 需指定white,white在前面指定
blacklist whitelist zookeeper 只能指定一个

1、 命令行客户端(测试)

1.1、关于主题的操作

kafka-topics.sh介绍(crud,c:create,r:retreive,u:update,d:delete)

1,shell脚本的作用:
  Create:新建主题
  delete:删除主题
  describe:查看主题的详情
  change a topic:更新主题
  
2,关键参数:
--alter 修改主题
--create Create a new topic(创建主题).                    
--delete Delete a topic(删除主题)
--describe  List details for the given topics(显示出给定主题的详情).
--list List all available topics(罗列出kafka分布式集群中所有有效的主题名).
--partitions 创建或是修改主题时通过该参数指定分区数。
--replication-factor 创建修改主题时通过该参数指定分区的副本数。
--topic 指定主题名
--zookeeper:用来指定zookeeper分布式集群
           

①新建主题

需求1:新建名为hadoop的主题,要求分区数1,副本数1
需求2:新建名为spark的主题,要求分区数2,副本数3
需求3:新建名为flink的主题,要求分区数3,副本数3

实操效果:
[[email protected] ~]# kafka-topics.sh --create --topic hadoop --zookeeper node01:2181 --partitions 1 --replication-factor 1
Created topic "hadoop".
[[email protected] ~]# kafka-topics.sh --create --topic spark --zookeeper node01:2181,node02:2181,node03:2181 --partitions 2 --replication-factor 3
Created topic "spark".
[[email protected] ~]# kafka-topics.sh --create --topic flink --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 3
Created topic "flink".

注意点:
[[email protected] ~]# kafka-topics.sh --create --topic storm --zookeeper node01:2181,node02:2181,node03:2181 --partitions 3 --replication-factor 4
Error while executing topic command : Replication factor: 4 larger than available brokers: 3.
 原因:副本一般是跨节点存储的。从安全性的角度考虑,不允许在一台节点上存在相同的副本(若是可以的话,硬盘要是破坏了,多个相同副本中的数据都会丢失,不安全!!)。
           

②查询主题

方式1:--list参数,查看当前kafka分布式集群中存在的有效的主题名
方式2:--describe参数,查看当前kafka分布式集群中存在的有效的主题的详情(主题名,分区数,副本的分布,分区的角色→leader,follower,同一时刻,只有leader角色的分区才能接收读写操作)

实操效果:
[[email protected] ~]# kafka-topics.sh --zookeeper node01:2181 --list
flink
hadoop
spark
[[email protected] ~]# kafka-topics.sh --zookeeper node01:2181 --describe
Topic:flink     PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: flink    Partition: 0    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103
        Topic: flink    Partition: 1    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101
        Topic: flink    Partition: 2    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102
     
PartitionCount:topic对应的partition的个数
ReplicationFactor:topic对应的副本因子,说白就是副本个数(包含自己,与hdfs上的副本数相同)
Partition:partition编号,从0开始递增
Leader:当前partition起作用的breaker.id
Replicas: 当前副本数据所在的breaker.id,是一个列表
Isr:当前kakfa集群中可用的breaker.id列表	
           

③修改主题

1,不能修改副本因子,否则报错,实操效果如下:
[[email protected] ~]# kafka-topics.sh --alter  --zookeeper node02:2181   --topic hadoop --replication-factor 2
Option "[replication-factor]" can't be used with option"[alter]"

2,可以修改分区数,实操效果如下:
[[email protected] ~]# kafka-topics.sh --alter  --zookeeper node02:2181   --topic hadoop --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[[email protected] ~]# kafka-topics.sh   --zookeeper node02:2181   --topic hadoop --describe
Topic:hadoop    PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: hadoop   Partition: 0    Leader: 102     Replicas: 102   Isr: 102
        Topic: hadoop   Partition: 1    Leader: 103     Replicas: 103   Isr: 103

注意:
①只能增加分区数,不能减少分区数。实操效果如下:
[[email protected] ~]# kafka-topics.sh --alter  --zookeeper node02:2181   --topic hadoop --partitions 1
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command : The number of partitions for a topic can only be increased. Topic hadoop currently has 2 partitions, 1 would not be an increase.
[2019-11-12 11:29:04,668] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic hadoop currently has 2 partitions, 1 would not be an increase.
 (kafka.admin.TopicCommand$)
 
②主题名不能修改,修改主题时,主题名是作为修改的条件存在的。
           

④删除主题

删除名为hadoop的主题,实操效果如下:
[[email protected] flink-0]# kafka-topics.sh --list --zookeeper node01:2181
flink
hadoop
spark
[[email protected] flink-0]# kafka-topics.sh --delete --topic hadoop --zookeeper node03:2181
Topic hadoop is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[[email protected] flink-0]# cd ..
[[email protected] kafka-logs]# kafka-topics.sh --list --zookeeper node01:2181
flink
spark
[[email protected] kafka-logs]# ll
total 20
-rw-r--r-- 1 root root   4 Nov 12 11:34 cleaner-offset-checkpoint
drwxr-xr-x 2 root root 141 Nov 12 10:51 flink-0
drwxr-xr-x 2 root root 141 Nov 12 10:51 flink-1
drwxr-xr-x 2 root root 141 Nov 12 10:51 flink-2
-rw-r--r-- 1 root root   4 Nov 12 11:34 log-start-offset-checkpoint
-rw-r--r-- 1 root root  56 Nov 12 10:29 meta.properties
-rw-r--r-- 1 root root  54 Nov 12 11:34 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root  54 Nov 12 11:35 replication-offset-checkpoint
drwxr-xr-x 2 root root 141 Nov 12 10:51 spark-0
drwxr-xr-x 2 root root 141 Nov 12 10:51 spark-1

注意:
 ①针对于kafka的版本kafka-1.0.2,在server.properties资源文件中,参数delete.topic.enable默认值是true。就是物理删除。(低版本的kafka,如:0.10.0.1,确实是逻辑删除)
 ②通过zookeeper进行确认,并且删除了元数据信息。
[zk: node03(CONNECTED) 11] ls /brokers/topics
[flink, spark]
           

1.2、关于消息的发布和订阅

kafka-console-producer.sh→进行消息的发布(生产)

参数说明如下:
--broker-list <String: broker-list>      REQUIRED: The broker list string in    
                                           the form HOST1:PORT1,HOST2:PORT2. 用来标识kafka分布式集群中的kafka服务器列表
                                           
--topic <String: topic>                  REQUIRED: The topic id to produce      
                                           messages to.  指定主题名(消息属于哪个主题的)
                                           
其余的参数使用默认值即可。
说明:
①上述的shell脚本后,会进入到阻塞状态,启动一个名为ConsoleProducer的进程
②在控制台录入消息,一行就是一条消息,回车后,送往kafka分布式集群中的MQ(message queue)存储起来。
           

kafka-console-consumer.sh→进行消息的订阅(消费)

参数名:
--blacklist <String: blacklist>        	 	用来指定黑名单。使用该参数的时机:
                                           对绝大多数的主题感兴趣,对极少数主题不感兴趣。此时,可以将这些不感兴趣的主题名置于黑名单列表中。
--whitelist <String: whitelist>         	 用来指定白名单列表。 使用该参数的时机:
                                           对极少数主题感兴趣,对绝大多数的主题不感兴趣。可以将感兴趣的主题置于到白名单列表中。                                       
--zookeeper <String: urls>             		针对于旧的kafka版本,消费的偏移量通过zookeeper来进行维护的。偏移量:记录的是订阅消息的进度,就是消息数。                        
--bootstrap-server <String: server to>   	针对于新版本的kafka,消费的偏移量的维护是通过kafka分布式集群自身的一个名为__consumer_offsets主题来维护来维护的。
 --from-beginning                        	 从头开始消费。否则,不带该参数,只会订阅新产生的消息(前提:订阅方要提前启动。)。
                                           
说明:
①上述的shell脚本后,会进入到阻塞状态,启动一个名为ConsoleConsumer的进程
②会读取特定主题相应分区中存储的消息。
 a)若是带了参数--from-beginning ,读取该主题所有分区中的数据
 b)若是不带参数--from-beginning,当前的订阅方接收不到历史的消息,只能接收到该进程启动后,新产生的消息。
③若是带--zookeeper参数,消费的offset(偏移量),该偏移量通过zookeeer进行维护。如:
④针对于消费offset的维护,高版本的kafka中,若是使用zookeeper来维护,有警告:
[[email protected] kafka-logs]# kafka-console-consumer.sh --topic spark  --zookeeper node01:2181
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
how do you do?
⑤针对于消费offset的维护,高版本的kafka中,建议kafka分布式集群来维护,会自动创建一个名为__consumer_offsets的主题,该主题默认有50个分区,每个分区默认有一个副本(可以在server.properties文件中手动进行定制):
[[email protected] ~]# kafka-topics.sh --describe --topic __consumer_offsets --zookeeper node01:2181
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:1     Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: 102     Replicas: 102   Isr: 102
        Topic: __consumer_offsets       Partition: 1    Leader: 103     Replicas: 103   Isr: 103
        Topic: __consumer_offsets       Partition: 2    Leader: 101     Replicas: 101   Isr: 101
        ...
        
⑥关于偏移量的维护:
 a)项目中一般需要手动进行维护,达到的效果是:偏移量被某个同类型的进程所独享。
 b)偏移量的维护,可选的方案很多:
   zookeeper
   redis  →使用得较多
   hbase
   rdbms(mysql,oracle等等)
 
⑦白名单:
情形1:通过kafka维护偏移量:
[[email protected] bin]# kafka-console-consumer.sh --whitelist 'storm|spark'  --bootstrap-server node02:9092,node01:9092,node03:9092 --from-beginning
storm storm
ok ok ok
are you ok?
hehe da da

情形2:通过zookeeper维护偏移量 (高版本不推荐了)
[[email protected] bin]# kafka-console-consumer.sh --whitelist storm,spark  --zookeeper node01:2181  --from-beginningUsing the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
are you ok?
yes, I do.
ok ok ok
hehe

⑧黑名单:
情形1:通过kafka维护偏移量:
[[email protected] bin]# kafka-console-consumer.sh --blacklist storm  --bootstrap-server node02:9092,node01:9092,node03:9092 --from-beginning
Exactly one of whitelist/topic is required.
注意:上述的方式,参数“--blacklist”不能单独使用,需要与--whitelist参数或者是--topic参数结合在一起使用。若是一起使用,显得累赘。一般不要带--blacklist。

情形2:通过zookeeper维护偏移量 (不推荐使用)
[[email protected] bin]# kafka-console-consumer.sh --blacklist storm,spark  --zookeeper node01:2181 --from-beginningUsing the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
flink 哦
           

2、 使用Java API来操作kafka分布式集群

2.1 准备

①maven工程,pom依赖
②启动zookeeper分布式集群,kafka分布式集群
③将资源文件consumer.properties→用来定制消息订阅方的参数
producer.properties→用来定制消息发布方的参数
拷贝到项目的resources资源目录下
③熟悉涉及到的api:
消息的发布:
  KafkaProducer<Key,Value> →发布消息的核心类,若是不指定消息的key,默认值是null
  ProducerRecord → 对每条消息的封装
消息的订阅:
  KafkaConsumer
           

2.2 发布消息

方案1:发布单条消息

源码以及效果

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MyMsgProducerDemo {
    public static void main(String[] args) {
        Producer<Integer, String> producer = null;
        try {
            //①准备Properties的实例,并将资源目录下的配置文件producer.properties中定制的参数封装进去
            Properties properties = new Properties();
      properties.load(MyMsgProducerDemo.class.getClassLoader().getResourceAsStream("producer.properties"));
           //②KafkaProducer实例的创建
            producer = new KafkaProducer(properties);
            //③准备消息
            ProducerRecord<Integer, String> record = new ProducerRecord<>("flink", 1, "走起!");
            //④发布消息
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //⑤资源释放
            if (producer != null) {
                producer.close();
            }
        }
    }
}
           

方案2:发布多条消息

源码以及效果

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class MyMsgProducerDemo2 {
    public static void main(String[] args) {
        //步骤:
        Producer<Integer, String> producer = null;
        try {
            //①准备Properties的实例,并将资源目录下的配置文件producer.properties中定制的参数封装进去
            Properties properties = new Properties();
          properties.load(MyMsgProducerDemo2.class.getClassLoader().getResourceAsStream("producer.properties"));
            //②KafkaProducer实例的创建
            producer = new KafkaProducer(properties);
            //③通过循环模拟发布多条消息
           for(int i=2;i<=11;i++){
               //a)准备消息
               ProducerRecord<Integer, String> record = new ProducerRecord<>("flink", i, i+"\t→ 走起!");
               //b) 发布消息
               producer.send(record);
           }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //⑤资源释放
            if (producer != null) {
                producer.close();
            }
        }
    }
}
           

方案3:追踪每条消息发布后的轨迹

源码以及效果

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class MyMsgProducerDemo3 {
    public static void main(String[] args) {
        //步骤:
        Producer<Integer, String> producer = null;
        try {
            //①准备Properties的实例,并将资源目录下的配置文件producer.properties中定制的参数封装进去
            Properties properties = new Properties();
          properties.load(MyMsgProducerDemo3.class.getClassLoader().getResourceAsStream("producer.properties"));
            //②KafkaProducer实例的创建
            producer = new KafkaProducer(properties);
            //③通过循环模拟发布多条消息
            for (int i = 1; i <= 10; i++) {
                //a)准备消息
                ProducerRecord<Integer, String> record = new ProducerRecord<>("flink", i, i + "\t→ 走起!呵呵哒哒...");
                //b) 发布消息
                producer.send(record, new Callback() {
                    /**
                     * 当前待发送的消息发送完毕后,下述方法会被回调执行
                     *
                     * @param metadata
                     * @param exception
                     */
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.printf("当前的消息对应的主题是:%s,内容是:%s,所在的分区是:%d,偏移量是:%d%n",
                                metadata.topic(), record.value(), metadata.partition(), metadata.offset());
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //⑤资源释放
            if (producer != null) {
                producer.close();
            }
        }
    }
}
           

2.3 消息的订阅

订阅当前新发布的个别消息(latest)

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class MyMsgConsumerDemo {
    public static void main(String[] args) {
        //步骤:
        Consumer<Integer, String> consumer = null;
        try {
            //①Properties的实例,将consumer.properties资源文件中的参数设置封装进去
            Properties properties = new Properties();
           properties.load(MyMsgConsumerDemo.class.getClassLoader().getResourceAsStream("consumer.properties"));
            //②KafkaConsumer
            consumer = new KafkaConsumer(properties);
            //③指定订阅的主题
            consumer.subscribe(Arrays.asList("flink"));
            //④正式开始进行订阅
            ConsumerRecords<Integer, String> records = consumer.poll(5000);
            //⑤分析订阅后的结果
            for (ConsumerRecord<Integer, String> record : records) {
                String topic = record.topic();
                int partition = record.partition();
                long offset = record.offset();
                String value = record.value();
                Integer key = record.key();
                System.out.printf("当前消息的详情是:%n主题名→%s,分区编号→%d,偏移量→%d,消息的value→%s,消息的key→%d%n%n",
                        topic, partition, offset, value, key
                );
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //⑥资源释放
            if (consumer != null) {
                consumer.close();
            }
        }
    }
}
           

循环订阅消息

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;

public class MyMsgConsumerDemo2 {
    public static void main(String[] args) {
        //步骤:
        Consumer<Integer, String> consumer = null;
        try {
            //①Properties的实例,将consumer.properties资源文件中的参数设置封装进去
            Properties properties = new Properties();
        properties.load(MyMsgConsumerDemo2.class.getClassLoader().getResourceAsStream("consumer.properties"));
            //②KafkaConsumer
            consumer = new KafkaConsumer(properties);
            //③指定订阅的主题
            consumer.subscribe(Arrays.asList("flink"));
            //④循环接收消息
            while(true){
                //④正式开始进行订阅
                ConsumerRecords<Integer, String> records = consumer.poll(1000);
                //⑤分析订阅后的结果
                for (ConsumerRecord<Integer, String> record : records) {
                    String topic = record.topic();
                    int partition = record.partition();
                    long offset = record.offset();
                    String value = record.value();
                    Integer key = record.key();
                    System.out.printf("当前消息的详情是:%n主题名→%s,分区编号→%d,偏移量→%d,消息的value→%s,消息的key→%d%n%n",
                            topic, partition, offset, value, key
                    );
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //⑥资源释放
            if (consumer != null) {
                consumer.close();
            }
        }
    }
}
           

从源头开始订阅所关注的各个主题的消息

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

public class MyMsgConsumerDemo3 {
    public static void main(String[] args) {
        //步骤:
        Consumer<Integer, String> consumer = null;
        try {
            //①Properties的实例,将consumer.properties资源文件中的参数设置封装进去
            Properties properties = new Properties();
          properties.load(MyMsgConsumerDemo3.class.getClassLoader().getResourceAsStream("consumer.properties"));
            //②KafkaConsumer
            consumer = new KafkaConsumer(properties);
            //③指定订阅的主题
            final Consumer<Integer, String> finalConsumer = consumer;
            consumer.subscribe(Arrays.asList("flink", "storm", "spark"), new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                }
                /**
                 * 从各个分区的开始位置进行订阅
                 * @param partitions
                 */
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    finalConsumer.seekToBeginning(partitions);
                }
            });
            //④循环接收消息
            while (true) {
                //④正式开始进行订阅
                ConsumerRecords<Integer, String> records = consumer.poll(1000);
                //⑤分析订阅后的结果
                for (ConsumerRecord<Integer, String> record : records) {
                    String topic = record.topic();
                    int partition = record.partition();
                    long offset = record.offset();
                    String value = record.value();
                    Integer key = record.key();
                    System.out.printf("当前消息的详情是:%n主题名→%s,分区编号→%d,偏移量→%d,消息的value→%s,消息的key→%d%n%n",
                            topic, partition, offset, value, key
                    );
                }
                //所有的消息订阅完毕,就退出
                if (records.isEmpty()) {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //⑥资源释放
            if (consumer != null) {
                consumer.close();
            }
        }
    }
}