來自:http://doc.okbase.net/QING____/archive/19447.html
也可參考:
http://blog.csdn.net/21aspnet/article/details/19325373
http://blog.csdn.net/unix21/article/details/18990123
kafka作為分布式日志收集或系統監控服務,我們有必要在合适的場合使用它。kafka的部署包括zookeeper環境/kafka環境,同時還需要進行一些配置操作.接下來介紹如何使用kafka.
我們使用3個zookeeper執行個體建構zk叢集,使用2個kafka broker建構kafka叢集.
其中kafka為0.8V,zookeeper為3.4.5V
一.Zookeeper叢集建構
我們有3個zk執行個體,分别為zk-0,zk-1,zk-2;如果你僅僅是測試使用,可以使用1個zk執行個體.
1) zk-0
調整配置檔案:
clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
##隻需要修改上述配置,其他配置保留預設值
啟動zookeeper
./zkServer.sh start
2) zk-1
調整配置檔案(其他配置和zk-0一隻):
clientPort=2182
##隻需要修改上述配置,其他配置保留預設值
啟動zookeeper
./zkServer.sh start
3) zk-2
調整配置檔案(其他配置和zk-0一隻):
clientPort=2183
##隻需要修改上述配置,其他配置保留預設值
啟動zookeeper
./zkServer.sh start
二. Kafka叢集建構
因為Broker配置檔案涉及到zookeeper的相關約定,是以我們先展示broker配置檔案.我們使用2個kafka broker來建構這個叢集環境,分别為kafka-0,kafka-1.
1) kafka-0
在config目錄下修改配置檔案為:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912
log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
因為kafka用scala語言編寫,是以運作kafka需要首先準備scala相關環境。
> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency
其中最後一條指令執行有可能出現異常,暫且不管。 啟動kafka broker:
> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &
因為zookeeper環境已經正常運作了,我們無需通過kafka來挂載啟動zookeeper.如果你的一台機器上部署了多個kafka broker,你需要聲明JMS_PORT.
2) kafka-1
broker.id=1
port=9093
##其他配置和kafka-0保持一緻
然後和kafka-0一樣執行打包指令,然後啟動此broker.
> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &
到目前為止環境已經OK了,那我們就開始展示程式設計執行個體吧。
三.項目準備
項目基于maven建構,不得不說kafka java用戶端實在是太糟糕了;建構環境會遇到很多麻煩。建議參考如下pom.xml;其中各個依賴包必須版本協調一緻。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.test
test-kafka
jar
test-kafka
http://maven.apache.org
1.0.0
log4j
log4j
1.2.14
org.apache.kafka
kafka_2.8.0
0.8.0-beta1
log4j
log4j
org.scala-lang
scala-library
2.8.1
com.yammer.metrics
metrics-core
2.2.0
com.101tec
zkclient
0.3
test-kafka-1.0
src/main/resources
true
maven-compiler-plugin
2.3.2
1.5
1.5
gb2312
maven-resources-plugin
2.2
gbk
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
四.Producer端代碼
1) producer.properties檔案:此檔案放在/resources目錄下
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
#partitioner.class=
metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
##在producer.type=async時有效
#batch.num.messages=100
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
2) LogProducer.java代碼樣例
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
package com.test.kafka;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class LogProducer {
private Producer inner;
public LogProducer() throws Exception{
Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
ProducerConfig config = new ProducerConfig(properties);
inner = new Producer(config);
}
public void send(String topicName,String message) {
if(topicName == null || message == null){
return;
}
KeyedMessage km = new KeyedMessage(topicName,message);
inner.send(km);
}
public void send(String topicName,Collection messages) {
if(topicName == null || messages == null){
return;
}
if(messages.isEmpty()){
return;
}
List> kms = new ArrayList>();
for(String entry : messages){
KeyedMessage km = new KeyedMessage(topicName,entry);
kms.add(km);
}
inner.send(kms);
}
public void close(){
inner.close();
}
public static void main(String[] args) {
LogProducer producer = null;
try{
producer = new LogProducer();
int i=0;
while(true){
producer.send("test-topic", "this is a sample" + i);
i++;
Thread.sleep(2000);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if(producer != null){
producer.close();
}
}
}
}
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
五.Consumer端
1) consumer.properties:檔案位于/resources目錄下
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
2) LogConsumer.java代碼樣例
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
package com.test.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class LogConsumer {
private ConsumerConfig config;
private String topic;
private int partitionsNum;
private MessageExecutor executor;
private ConsumerConnector connector;
private ExecutorService threadPool;
public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
config = new ConsumerConfig(properties);
this.topic = topic;
this.partitionsNum = partitionsNum;
this.executor = executor;
}
public void start() throws Exception{
connector = Consumer.createJavaConsumerConnector(config);
Map topics = new HashMap();
topics.put(topic, partitionsNum);
Map>> streams = connector.createMessageStreams(topics);
List> partitions = streams.get(topic);
threadPool = Executors.newFixedThreadPool(partitionsNum);
for(KafkaStream partition : partitions){
threadPool.execute(new MessageRunner(partition));
}
}
public void close(){
try{
threadPool.shutdownNow();
}catch(Exception e){
//
}finally{
connector.shutdown();
}
}
class MessageRunner implements Runnable{
private KafkaStream partition;
MessageRunner(KafkaStream partition) {
this.partition = partition;
}
public void run(){
ConsumerIterator it = partition.iterator();
while(it.hasNext()){
MessageAndMetadata item = it.next();
System.out.println("partiton:" + item.partition());
System.out.println("offset:" + item.offset());
executor.execute(new String(item.message()));//UTF-8
}
}
}
interface MessageExecutor {
public void execute(String message);
}
public static void main(String[] args) {
LogConsumer consumer = null;
try{
MessageExecutor executor = new MessageExecutor() {
public void execute(String message) {
System.out.println(message);
}
};
consumer = new LogConsumer("test-topic", 2, executor);
consumer.start();
}catch(Exception e){
e.printStackTrace();
}finally{
// if(consumer != null){
// consumer.close();
// }
}
}
}
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iYhZjMzQ2NhRmYhFTYmNjZ4ATZmljZ2UWNhJGNwMDO08CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)