天天看點

java連接配接kafka api_Kafka-JavaAPI(Producer And Consumer)

Kafka--JAVA API(Producer和Consumer)

Kafka 版本2.11-0.9.0.0

producer

package com.yzy.spark.kafka;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducer extends Thread{

private String topic;

//--1

private Producer producer;

public KafkaProducer(String topic){

this.topic=topic;

Properties properties = new Properties(); //--2

properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);

properties.put("serializer.class","kafka.serializer.StringEncoder");

properties.put("request.require.acks","1");

// properties.put("partitioner.class","com.yzy.spark.kafka.MyPartition");

ProducerConfig config=new ProducerConfig(properties);

producer=new Producer<>(config);

}

@Override

public void run() {

int messageNo=1;

while (true){

String message="message"+ messageNo;

producer.send(new KeyedMessage("test2",String.valueOf(messageNo),message)); //--4

System.out.println("Sent:"+message);

messageNo++;

try {

Thread.sleep(1000);

}catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

1.定義Producer對象,這裡要注意泛型類型,之後的KeyedMessage的泛型類型和Producer相同。

2.建立Producer對象需要傳入一個ProducerConfig對象,而ProducerConfig對象需要由Properties對象構造,properties的屬性設定可以檢視ProducerConfig源碼,注釋很詳細(個别屬性在ProducerConfig父類AsyncProducerConfig 和 SyncProducerConfigShared中)。

3.該屬性可以指定partitioner,如果不設定預設會設為kafka.producer.DefaultPartitioner。

4.看看KeyedMessage的源碼:

case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {

if(topic == null)

throw new IllegalArgumentException("Topic cannot be null.")

def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

def this(topic: String, key: K, message: V) = this(topic, key, key, message)

def partitionKey = {

if(partKey != null)

partKey

else if(hasKey)

key

else

null

}

def hasKey = key != null

}

參數有4個,topic必填,message是消息,通常隻填這兩個參數即可發送消息。key和partKey是用于partition的參數,partKey的優先級高于key,但是partKey隻對目前消息起作用,key和partKey隻能是String類型。下面來看看partition政策和key。

partition

先在伺服器端将topic test2的partitions設定為3

kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test2

然後回到用戶端看看kafka.producer.DefaultPartitioner源碼

package kafka.producer

import kafka.utils._

import org.apache.kafka.common.utils.Utils

class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {

private val random = new java.util.Random

def partition(key: Any, numPartitions: Int): Int = {

Utils.abs(key.hashCode) % numPartitions

}

}

該類有一個方法 def partition(key: Any, numPartitions: Int),第一個參數為上文所說的key或partKey,第二個為partitions的數量,傳入的值就是在伺服器設定的值(3),将key的hashCode對numPartitions取餘得到結果(選擇對應編号的partition)

我們可以自己定義一個partition.class并配置到properties屬性中,這裡給一個簡單的例子:

package com.yzy.spark.kafka;

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

public class MyPartition implements Partitioner {

public MyPartition(VerifiableProperties properties){

}

@Override

public int partition(Object key, int numPartitions) {

System.out.println("numPartitions:"+numPartitions);

return key.hashCode()%numPartitions;

}

}

Consumer

package com.yzy.spark.kafka;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

public class KafkaConsumer extends Thread{

private String topic;

private String groupId;

public KafkaConsumer(String topic,String groupId){

this.topic=topic;

this.groupId=groupId;

}

private ConsumerConnector createConnector(){

Properties properties=new Properties();//--1

properties.put("zookeeper.connect",KafkaProperties.ZK);

properties.put("group.id",groupId);

properties.put("auto.offset.reset", "largest");//--2

ConsumerConfig consumerConfig = new ConsumerConfig(properties);

return Consumer.createJavaConsumerConnector(consumerConfig);

}

@Override

public void run() {

ConsumerConnector consumerConnector=this.createConnector();

Map topicCountMap=new HashMap<>();

topicCountMap.put(topic,1);

Map>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);

KafkaStream stream = messageStreams.get(topic).get(0);

ConsumerIterator iterator = stream.iterator();

while(iterator.hasNext()){

String message=new String(iterator.next().message());

}

}

}

Consumer相關的東西比較多,涉及到group和partition機制,以官方文檔為準。

1.properties和producer一樣看源碼配置。

2.這個屬性和shell指令中的--from-beginning對應。可以填smallest(從頭讀取)和largest(預設值,讀取最新的元素,嚴格來說是最新的offset位置開始讀取)。注意:每一次一個新的consumer試圖去消費一個topic時,都是從所在group的largest offset位置讀取,即也可通過設定group.id來實作from-beginning,隻要将每個consumer的group.id都設定為一個新值即可,例如properties.put("group.id", UUID.randomUUID().toString());