天天看点

Kafka java api-消费者代码与消费分析、生产者消费者配置文件详解

1、消费者代码

用到消费者,所以也必须先把前面写过的生产者代码也贴一下吧

生产者代码与自定义partition

使用maven导包

<dependencies>
      <dependency>
           <groupId>com.alibaba.jstorm</groupId>
           <artifactId>jstorm-core</artifactId>
           <version>2.1.1</version>
           <exclusions>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-nop</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-jdk14</artifactId>
               </exclusion>
           </exclusions>
       </dependency>
       <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka_2.8.2</artifactId>
           <version>0.8.1</version>
           <exclusions>
               <exclusion>
                   <artifactId>jmxtools</artifactId>
                   <groupId>com.sun.jdmk</groupId>
               </exclusion>
               <exclusion>
                   <artifactId>jmxri</artifactId>
                   <groupId>com.sun.jmx</groupId>
               </exclusion>
               <exclusion>
                   <artifactId>jms</artifactId>
                   <groupId>javax.jms</groupId>
               </exclusion>
               <exclusion>
                   <groupId>org.apache.zookeeper</groupId>
                   <artifactId>zookeeper</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-log4j12</artifactId>
               </exclusion>
               <exclusion>
                   <groupId>org.slf4j</groupId>
                   <artifactId>slf4j-api</artifactId>
               </exclusion>
           </exclusions>
       </dependency>
   </dependencies>
           
/**
 * 这是一个简单的Kafka producer代码
 * 包含两个功能:
 * 1、数据发送
 * 2、数据按照自定义的partition策略进行发送
 *
 *
 * KafkaSpout的类
 */
public class KafkaProducerSimple {
    public static void main(String[] args) {
        /**
         * 1、指定当前kafka producer生产的数据的目的地
         *  创建topic可以输入以下命令,在kafka集群的任一节点进行创建。
         *  bin/kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic test
         */

        String TOPIC = "orderMq";
        /**
         * 2、读取配置文件
         */
        Properties props = new Properties();
        /*
         * key.serializer.class默认为serializer.class  key的序列化使用哪个类
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /*
         * kafka broker对应的主机,格式为host1:port1,host2:port2
         */
        props.put("metadata.broker.list", "mini1:9092,mini2:9092,mini3:9092");
        /*
         * request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1
         * 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。
         * 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。
         * 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。
         * 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。
         * 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。
         * -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。
         * 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失
         */
        props.put("request.required.acks", "1");
        /*
         * 可选配置,如果不配置,则使用默认的partitioner partitioner.class
         * 默认值:kafka.producer.DefaultPartitioner
         * 用来把消息分到各个partition中,默认行为是对key进行hash。
         */
        props.put("partitioner.class", "com.scu.kafka.MyLogPartitioner");
//        props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
        /**
         * 3、通过配置文件,创建生产者
         */
        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
        /**
         * 4、通过for循环生产数据
         */
        for (int messageNo = ; messageNo < ; messageNo++) {
            /**
             * 5、调用producer的send方法发送数据
             * 注意:这里需要指定 partitionKey,用来配合自定义的MyLogPartitioner进行数据分发
             */
            producer.send(new KeyedMessage<String, String>(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
        }
    }
}
           
public class MyLogPartitioner implements Partitioner {
    private static Logger logger = Logger.getLogger(MyLogPartitioner.class);

    public MyLogPartitioner(VerifiableProperties props) {
    }

    /**
     *
     * @param obj 传来的key 用它来进行hash分到partition
     * @param numPartitions 几个partition 如果集群中已存在该topic,那么partition数为原本存在数,否则默认是2
     * @return 生产到哪个partition
     */
    public int partition(Object obj, int numPartitions) {
        return Integer.parseInt(obj.toString())%numPartitions;
    }

}
           

注:orderMq这个topic很早就通过命令行创建好了,指定了partition是3个。

下面是消费者代码

public class KafkaConsumerSimple implements Runnable {
    public String title;
    public KafkaStream<byte[], byte[]> stream;
    public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
        this.title = title;
        this.stream = stream;
    }
    public void run() {
        System.out.println("开始运行 " + title);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        /**
         * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
         * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
         * */
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> data = it.next();
            String topic = data.topic();
            int partition = data.partition();
            long offset = data.offset();
            String msg = new String(data.message());
            System.out.println(String.format(
                    "Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",
                    title, topic, partition, offset, msg));
        }
        System.out.println(String.format("Consumer: [%s] exiting ...", title));
    }

    public static void main(String[] args) throws Exception{
        Properties props = new Properties();
        props.put("group.id", "dashujujiagoushi");//消费组组组名,任意取
        props.put("zookeeper.connect", "mini1:2181,mini2:2181,mini3:2181");//zookeeper连接
        props.put("auto.offset.reset", "largest");//最新位置开始消费
        props.put("auto.commit.interval.ms", "1000");
        props.put("partition.assignment.strategy", "roundrobin");//分区分配策略
        ConsumerConfig config = new ConsumerConfig(props);
        String topic1 = "orderMq";
        String topic2 = "paymentMq";
        //只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
        ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
        //定义一个map
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic1, );
        //Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
        Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
        //取出 `kafkaTest` 对应的 streams
        List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic1);
        //创建一个容量为4的线程池
        ExecutorService executor = Executors.newFixedThreadPool();
        //创建20个consumer threads
        for (int i = ; i < streams.size(); i++)
            executor.execute(new KafkaConsumerSimple("消费者" + (i + ), streams.get(i)));
    }
}
           

测试:

先执行消费者程序,尽管partition目录里面的segment文件是有以前生成的数据,但是不会打印出来而是一直提示(已经标记为消费状态的就不再消费了,默认情况就是这样,可以自己设置从0开始消费)

:: [main-SendThread(mini1:)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid:  after ms
:: [main-SendThread(mini1:)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid:  after ms
           

需要进行生产,所以再执行生产者程序,控制台打印如下:

...
Consumer: [消费者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
Consumer: [消费者2],  Topic: [orderMq],  PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
Consumer: [消费者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17858], msg: [appidb145da08-bb61-42e7-b140-9fed576c2faeitcast]
Consumer: [消费者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17859], msg: [appid909a90ae-c0fb-42ac-97de-6d7438895e07itcast]
Consumer: [消费者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]
Consumer: [消费者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17714], msg: [appidb93b9355-4713-4e22-823a-756b4fe75bdfitcast]
Consumer: [消费者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17715], msg: [appidf82ca658-528a-4f40-a023-8a155c15eaa1itcast]
...
           

精简下如下

Consumer: [消费者1],  Topic: [orderMq],  PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
Consumer: [消费者2],  Topic: [orderMq],  PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
Consumer: [消费者3],  Topic: [orderMq],  PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]
           

能看到三个消费者对应消费的partition。

那么考虑以下问题

在创建orderMq的时候指定partition是3个,那么如果此时我指定创建5个KafkaStream,那么会怎么消费呢?

消费者代码修改两次如下

topicCountMap.put(topic1, );
ExecutorService executor = Executors.newFixedThreadPool();
           

再次同上一样执行,输出结果能看到只有3个消费者,所以指定KafkaStream比partition多是没用的,只会有对应数量的消费者去消费对应的partition上的数据。

Consumer: [消费者2],  Topic: [orderMq],  PartitionId: [2], Offset: [26420], msg: [appid4b778b51-33c7-42de-83c2-5b85f8f2428aitcast]
Consumer: [消费者3],  Topic: [orderMq],  PartitionId: [0], Offset: [26423], msg: [appid86045c25-7b3f-4c82-ad2a-3e8e11958b28itcast]
Consumer: [消费者4],  Topic: [orderMq],  PartitionId: [1], Offset: [26562], msg: [appid213b5a91-a7bf-4a39-b585-456d95748566itcast]
           

如果指定的KafkaStream只有2呢?不做测试了,结果是其中一个消费者会消费2个partition,另外一个消费1个partition中的数据。

生产者,消费者配置文件解释

用java api不管是写生产者代码还是消费者代码都使用配置文件,那么下面列出了生产者和消费者配置文件介绍

生产者配置文件解释

#指定kafka节点列表,用于获取metadata,不必全部指定
metadata.broker.list=kafka01:,kafka02:

# 指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
#partitioner.class=kafka.producer.DefaultPartitioner

# 是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
compression.codec=none

# 指定序列化处理类
serializer.class=kafka.serializer.DefaultEncoder

# 如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
#compressed.topics=

# 设置发送数据是否需要服务端的反馈,有三个值0,1,-1
# 0: producer不会等待broker发送ack
# 1: 当leader接收到消息之后发送ack
# -1: 当所有的follower都同步消息成功后发送ack.
request.required.acks=

# 在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功)
request.timeout.ms=

# 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量,
也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync

# 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
# 此值和batch.num.messages协同工作.
queue.buffering.max.ms = 

# 在async模式下,producer端允许buffer的最大消息量
# 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
# 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
queue.buffering.max.messages=

# 如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=

# 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后
# 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)
# 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间
# -1: 无阻塞超时限制,消息不会被抛弃
# 0:立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-


# 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
# 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
# 有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=

# producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况
# 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新
# (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000
topic.metadata.refresh.interval.ms=
           

消费者配置文件解释

# zookeeper连接服务器地址
zookeeper.connect=zk01:,zk02:,zk03:

# zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉
zookeeper.session.timeout.ms=

#当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡
zookeeper.connection.timeout.ms=

# 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息
zookeeper.sync.time.ms=

#指定消费组
group.id=xxx

# 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息
# 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true
auto.commit.enable=true

# 自动更新时间。默认60 * 1000
auto.commit.interval.ms=

# 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察
conusmer.id=xxx

# 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生
client.id=xxxx

# 最大取多少块缓存到消费者(默认10)
queued.max.message.chunks=

# 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数.
rebalance.max.retries=

# 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存
fetch.min.bytes=

# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer
fetch.wait.max.ms=
socket.receive.buffer.bytes=

# 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest
auto.offset.reset=smallest

# 指定序列化处理类
derializer.class=kafka.serializer.DefaultDecoder