天天看點

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

來自:http://doc.okbase.net/QING____/archive/19447.html

也可參考:

http://blog.csdn.net/21aspnet/article/details/19325373

http://blog.csdn.net/unix21/article/details/18990123

kafka作為分布式日志收集或系統監控服務,我們有必要在合适的場合使用它。kafka的部署包括zookeeper環境/kafka環境,同時還需要進行一些配置操作.接下來介紹如何使用kafka.

我們使用3個zookeeper執行個體建構zk叢集,使用2個kafka broker建構kafka叢集.

其中kafka為0.8V,zookeeper為3.4.5V

一.Zookeeper叢集建構

我們有3個zk執行個體,分别為zk-0,zk-1,zk-2;如果你僅僅是測試使用,可以使用1個zk執行個體.

1) zk-0

調整配置檔案:

clientPort=2181

server.0=127.0.0.1:2888:3888

server.1=127.0.0.1:2889:3889

server.2=127.0.0.1:2890:3890

##隻需要修改上述配置,其他配置保留預設值

啟動zookeeper

./zkServer.sh start

2) zk-1

調整配置檔案(其他配置和zk-0一隻):

clientPort=2182

##隻需要修改上述配置,其他配置保留預設值

啟動zookeeper

./zkServer.sh start

3) zk-2

調整配置檔案(其他配置和zk-0一隻):

clientPort=2183

##隻需要修改上述配置,其他配置保留預設值

啟動zookeeper

./zkServer.sh start

二. Kafka叢集建構

因為Broker配置檔案涉及到zookeeper的相關約定,是以我們先展示broker配置檔案.我們使用2個kafka broker來建構這個叢集環境,分别為kafka-0,kafka-1.

1) kafka-0

在config目錄下修改配置檔案為:

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

broker.id=0

port=9092

num.network.threads=2

num.io.threads=2

socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=104857600

log.dir=./logs

num.partitions=2

log.flush.interval.messages=10000

log.flush.interval.ms=1000

log.retention.hours=168

#log.retention.bytes=1073741824

log.segment.bytes=536870912

log.cleanup.interval.mins=10

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

zookeeper.connection.timeout.ms=1000000

kafka.metrics.polling.interval.secs=5

kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

kafka.csv.metrics.dir=/tmp/kafka_metrics

kafka.csv.metrics.reporter.enabled=false

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

因為kafka用scala語言編寫,是以運作kafka需要首先準備scala相關環境。

> cd kafka-0

> ./sbt update

> ./sbt package

> ./sbt assembly-package-dependency

其中最後一條指令執行有可能出現異常,暫且不管。 啟動kafka broker:

> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

因為zookeeper環境已經正常運作了,我們無需通過kafka來挂載啟動zookeeper.如果你的一台機器上部署了多個kafka broker,你需要聲明JMS_PORT.

2) kafka-1

broker.id=1

port=9093

##其他配置和kafka-0保持一緻

然後和kafka-0一樣執行打包指令,然後啟動此broker.

> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

到目前為止環境已經OK了,那我們就開始展示程式設計執行個體吧。

三.項目準備

項目基于maven建構,不得不說kafka java用戶端實在是太糟糕了;建構環境會遇到很多麻煩。建議參考如下pom.xml;其中各個依賴包必須版本協調一緻。

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.test

test-kafka

jar

test-kafka

http://maven.apache.org

1.0.0

log4j

log4j

1.2.14

org.apache.kafka

kafka_2.8.0

0.8.0-beta1

log4j

log4j

org.scala-lang

scala-library

2.8.1

com.yammer.metrics

metrics-core

2.2.0

com.101tec

zkclient

0.3

test-kafka-1.0

src/main/resources

true

maven-compiler-plugin

2.3.2

1.5

1.5

gb2312

maven-resources-plugin

2.2

gbk

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

四.Producer端代碼

1) producer.properties檔案:此檔案放在/resources目錄下

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

#partitioner.class=

metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093

##,127.0.0.1:9093

producer.type=sync

compression.codec=0

serializer.class=kafka.serializer.StringEncoder

##在producer.type=async時有效

#batch.num.messages=100

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

2) LogProducer.java代碼樣例

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

package com.test.kafka;

import java.util.ArrayList;

import java.util.Collection;

import java.util.List;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class LogProducer {

private Producer inner;

public LogProducer() throws Exception{

Properties properties = new Properties();

properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));

ProducerConfig config = new ProducerConfig(properties);

inner = new Producer(config);

}

public void send(String topicName,String message) {

if(topicName == null || message == null){

return;

}

KeyedMessage km = new KeyedMessage(topicName,message);

inner.send(km);

}

public void send(String topicName,Collection messages) {

if(topicName == null || messages == null){

return;

}

if(messages.isEmpty()){

return;

}

List> kms = new ArrayList>();

for(String entry : messages){

KeyedMessage km = new KeyedMessage(topicName,entry);

kms.add(km);

}

inner.send(kms);

}

public void close(){

inner.close();

}

public static void main(String[] args) {

LogProducer producer = null;

try{

producer = new LogProducer();

int i=0;

while(true){

producer.send("test-topic", "this is a sample" + i);

i++;

Thread.sleep(2000);

}

}catch(Exception e){

e.printStackTrace();

}finally{

if(producer != null){

producer.close();

}

}

}

}

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

五.Consumer端

1) consumer.properties:檔案位于/resources目錄下

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

##,127.0.0.1:2182,127.0.0.1:2183

# timeout in ms for connecting to zookeeper

zookeeper.connectiontimeout.ms=1000000

#consumer group id

group.id=test-group

#consumer timeout

#consumer.timeout.ms=5000

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

2) LogConsumer.java代碼樣例

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例

package com.test.kafka;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

public class LogConsumer {

private ConsumerConfig config;

private String topic;

private int partitionsNum;

private MessageExecutor executor;

private ConsumerConnector connector;

private ExecutorService threadPool;

public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{

Properties properties = new Properties();

properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));

config = new ConsumerConfig(properties);

this.topic = topic;

this.partitionsNum = partitionsNum;

this.executor = executor;

}

public void start() throws Exception{

connector = Consumer.createJavaConsumerConnector(config);

Map topics = new HashMap();

topics.put(topic, partitionsNum);

Map>> streams = connector.createMessageStreams(topics);

List> partitions = streams.get(topic);

threadPool = Executors.newFixedThreadPool(partitionsNum);

for(KafkaStream partition : partitions){

threadPool.execute(new MessageRunner(partition));

}

}

public void close(){

try{

threadPool.shutdownNow();

}catch(Exception e){

//

}finally{

connector.shutdown();

}

}

class MessageRunner implements Runnable{

private KafkaStream partition;

MessageRunner(KafkaStream partition) {

this.partition = partition;

}

public void run(){

ConsumerIterator it = partition.iterator();

while(it.hasNext()){

MessageAndMetadata item = it.next();

System.out.println("partiton:" + item.partition());

System.out.println("offset:" + item.offset());

executor.execute(new String(item.message()));//UTF-8

}

}

}

interface MessageExecutor {

public void execute(String message);

}

public static void main(String[] args) {

LogConsumer consumer = null;

try{

MessageExecutor executor = new MessageExecutor() {

public void execute(String message) {

System.out.println(message);

}

};

consumer = new LogConsumer("test-topic", 2, executor);

consumer.start();

}catch(Exception e){

e.printStackTrace();

}finally{

// if(consumer != null){

// consumer.close();

// }

}

}

}

java kafka zookeeper_kafka叢集和zookeeper叢集的部署,kafka的java代碼示例