文章目录
- Kafka的使用
-
- 一、kafka基本原理
-
- 1.1 消息队列的作用
- 1.2 点对点模式
- 1.3 发布订阅模式
- 1.4 kafka
- 1.5 工作流程分析
-
- 1.5.1 发送数据
- 1.5.2 保存数据
- 1.5.3 消费数据
- 二、Kafka的c++使用
- 三、Kafka的python使用
参考:
Kafka的使用
一、kafka基本原理
1.1 消息队列的作用
- 解耦:解除消息生产者和消息消费者的依赖关系
- 异步:消息生产者发送消息后,可以做其他的事
- 削峰:缓解流量高峰
1.2 点对点模式
- 点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。
- 生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。
- 点对点模型的的优点:是消费者拉取消息的频率可以由自己控制。
- 缺点:消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。
1.3 发布订阅模式
- 生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息
- 消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题。
1.4 kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展能力
Producer:Producer即生产者,消息的产生者,是消息的入口。
kafka cluster:
Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
Replication: 每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message:每一条发送的消息主体。
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据, 这也是为了提高kafka的吞吐量!
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
1.5 工作流程分析
1.5.1 发送数据
producer产生消息,将消息写到leader当中,不会直接将数据写入到follower。写入的流程如下:
注:
消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:
-
分区的目的:
(1)方便扩展。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
(2)提高并发。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
-
kafka分发的几个原则:
(1)partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
(2) 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
(3)如果既没指定partition,又没有设置key,则会轮询选出一个partition。
-
ACK应答机制:
(1)0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
(2)1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
(3)all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
注:如果topic不存在,kafka会自动创建topic,分区和副本的数量根据默认配置都是1。
1.5.2 保存数据
Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
-
Partition结构
每个topic分为一个或多个Partition,Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。 -
Message结构
log文件就实际是存储message的地方,消息主要包含消息体、消息大小、offset、压缩类型……等等。
(1)offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
(2)消息大小:消息大小占用4byte,用于描述消息的大小。
(3)消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
-
存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
(1)基于时间,默认配置是168小时(7天)。
(2)基于大小,默认配置是1073741824。
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!
1.5.3 消费数据
Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取。
图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议消费者组的consumer的数量与partition的数量一致!
-
数据查找:
假如现在需要查找一个offset为368801的message是什么样的过程呢?下图:
(1)先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
(2)打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
(3)根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。
segment+有序offset+稀疏索引+二分查找+顺序查找
在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的offset已经直接维护在kafk集群的__consumer_offsets这个topic中!
二、Kafka的c++使用
-
生产者
kafkaproducer.h
#ifndef KAFKAPRODUCER_H
#define KAFKAPRODUCER_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <librdkafka/rdkafkacpp.h>
#include <vector>
#include <fstream>
using std::string;
using std::list;
using std::cout;
using std::endl;
using std::vector;
using std::fstream;
class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) {
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
message.errstr() << std::endl;
if (message.key())
std::cout << "Key: " << *(message.key()) << ";" << std::endl;
}
};
class KafkaProducerEventCallBack : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
class KafkaProducer
{
public:
KafkaProducer(const string &brokers, const string &topics, int nPpartition = 0);
virtual ~KafkaProducer();
bool Init();
void Send(const string &msg);
void Stop();
private:
RdKafka::Producer *m_pProducer = NULL;
RdKafka::Topic *m_pTopic = NULL;
KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;
KafkaProducerEventCallBack m_producerEventCallBack;
std::string m_strTopics;
std::string m_strBroker;
bool m_bRun = false;
int m_nPpartition = 0;
};
#endif // KAFKAPRODUCER_H
kafkaproducer.cpp
#include <iostream>
#include "kafkaproducer.h"
KafkaProducer::KafkaProducer(const string &brokers, const string &topics, int nPpartition /*= 1*/)
: m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{
}
KafkaProducer::~KafkaProducer()
{
Stop();
}
bool KafkaProducer::Init()
{
string errstr = "";
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
/*Set configuration properties,设置broker list*/
if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){
std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;
}
/* Set delivery report callback */
conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);
conf->set("event_cb", &m_producerEventCallBack, errstr);
/*
* Create producer using accumulated global configuration.
*/
m_pProducer = RdKafka::Producer::create(conf, errstr);
if (!m_pProducer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
return false;
}
std::cout << "% Created producer " << m_pProducer->name() << std::endl;
/*
* Create topic handle.
*/
m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,
tconf, errstr);
if (!m_pTopic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
return false;
}
return true;
}
void KafkaProducer::Send(const string &msg)
{
if (!m_bRun)
return;
/*
* Produce message
*/
RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(msg.c_str()), msg.size(),
NULL, NULL);
if (resp != RdKafka::ERR_NO_ERROR)
std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;
else
std::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;
m_pProducer->poll(0);
/* Wait for messages to be delivered */ //firecat add
while (m_bRun && m_pProducer->outq_len() > 0) {
std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;
m_pProducer->poll(1000);
}
}
void KafkaProducer::Stop()
{
delete m_pTopic;
delete m_pProducer;
}
int main()
{
//KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("localhost:9092", "test", 0);
KafkaProducer* Kafkapr_ = new KafkaProducer("localhost:9092", "test", 0);
Kafkapr_->Init();
Kafkapr_->Send("hello world!");
char str_msg[] = "Hello Kafka!";
while (fgets(str_msg, sizeof(str_msg), stdin))
{
size_t len = strlen(str_msg);
if (str_msg[len - 1] == '\n')
{
str_msg[--len] = '\0';
}
if (strcmp(str_msg, "end") == 0)
{
break;
}
Kafkapr_->Send(str_msg);
}
return 0;
}
-
消费者
kafka_comsumer.h
#include <vector>
#include <string>
#include <memory>
#include <getopt.h>
#include <csignal>
#include <iostream>
#include "librdkafka/rdkafkacpp.h"
class kafka_consumer_client{
public:
kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset=-1);
//kafka_consumer_client();
virtual ~kafka_consumer_client();
bool initClient();
bool consume(int timeout_ms); //消费消息
void finalize();
private:
void consumer(RdKafka::Message *msg, void *opt);
std::string brokers_;
std::string topics_;
std::string groupid_;
int64_t last_offset_ = 0;
RdKafka::Consumer *kafka_consumer_ = nullptr;
RdKafka::Topic *topic_ = nullptr;
int64_t offset_ = RdKafka::Topic::OFFSET_BEGINNING;
int32_t partition_ = 0;
};
kafka_comsumer.cpp
#include "kafka_comsumer.h"
bool run_ = true;
static void sigterm (int sig) {
run_ = false;
}
kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset):brokers_(brokers), topics_(topics),groupid_(groupid),
offset_(offset){
}
//kafka_consumer_client::kafka_consumer_client(){}
kafka_consumer_client::~kafka_consumer_client(){}
bool kafka_consumer_client::initClient(){
RdKafka::Conf *conf = nullptr;
conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(!conf){
fprintf(stderr, "RdKafka create global conf failed\n");
return false;
}
std::string errstr;
/*设置broker list*/
if (conf->set("bootstrap.servers", brokers_, errstr) != RdKafka::Conf::CONF_OK){
fprintf(stderr, "RdKafka conf set brokerlist failed : %s\n", errstr.c_str());
}
/*设置consumer group*/
if (conf->set("group.id", groupid_, errstr) != RdKafka::Conf::CONF_OK){
fprintf(stderr, "RdKafka conf set group.id failed : %s\n", errstr.c_str());
}
std::string strfetch_num = "10240000";
/*每次从单个分区中拉取消息的最大尺寸*/
if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){
fprintf(stderr, "RdKafka conf set max.partition failed : %s\n", errstr.c_str());
}
/*创建kafka consumer实例*/
kafka_consumer_ = RdKafka::Consumer::create(conf, errstr);
if(!kafka_consumer_){
fprintf(stderr, "failed to ceate consumer\n");
}
delete conf;
RdKafka::Conf *tconf = nullptr;
/*创建kafka topic的配置*/
tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(!tconf){
fprintf(stderr, "RdKafka create topic conf failed\n");
return false;
}
/*kafka + zookeeper,当消息被消费时,会想zk提交当前groupId的consumer消费的offset信息,
当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是
ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,
在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始
消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
开始位置消费所有消息.*/
if(tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK){
fprintf(stderr, "RdKafka conf set auto.offset.reset failed : %s\n", errstr.c_str());
}
topic_ = RdKafka::Topic::create(kafka_consumer_, topics_, tconf, errstr);
if(!topic_){
fprintf(stderr, "RdKafka create topic failed : %s\n", errstr.c_str());
}
delete tconf;
RdKafka::ErrorCode resp = kafka_consumer_->start(topic_, partition_, offset_);
if (resp != RdKafka::ERR_NO_ERROR){
fprintf(stderr, "failed to start consumer : %s\n", RdKafka::err2str(resp).c_str());
}
return true;
}
void kafka_consumer_client::consumer(RdKafka::Message *message, void *opt){
switch(message->err()){
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
printf("%.*s\n",
static_cast<int>(message->len()),
static_cast <const char*>(message->payload()));
last_offset_ = message->offset();
break;
case RdKafka::ERR__PARTITION_EOF:
std::cerr << "%% Reached the end of the queue, offset: " << last_offset_ << std::endl;
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run_ = false;
break;
default:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run_ = false;
break;
}
}
bool kafka_consumer_client::consume(int timeout_ms){
RdKafka::Message *msg = nullptr;
//消费消息
while(run_){
msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
consumer(msg, nullptr);
kafka_consumer_->poll(0);
delete msg;
}
kafka_consumer_->stop(topic_, partition_);
if(topic_){
delete topic_;
topic_ = nullptr;
}
if(kafka_consumer_){
delete kafka_consumer_;
kafka_consumer_ = nullptr;
}
/*销毁kafka实例*/
RdKafka::wait_destroyed(5000);
return true;
}
三、Kafka的python使用
- 安装kafka
pip install kafka-python
- 简单使用
# test.py
import sys
import time
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
KAFAKA_HOST = "127.0.0.1"
KAFAKA_PORT = 9092
KAFAKA_TOPIC = "test123"
class Kafka_producer():
'''''
生产模块:根据不同的key,区分消息
'''
def __init__(self, kafkahost,kafkaport, kafkatopic, key):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.key = key
print("producer:h,p,t,k",kafkahost,kafkaport,kafkatopic,key)
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
)
print("boot svr:",bootstrap_servers)
self.producer = KafkaProducer(bootstrap_servers = bootstrap_servers
)
def sendjsondata(self, params):
try:
parmas_message = json.dumps(params,ensure_ascii=False)
producer = self.producer
print(parmas_message)
v = parmas_message.encode('utf-8')
k = key.encode('utf-8')
print("send msg:(k,v)",k,v)
producer.send(self.kafkatopic, key=k, value= v)
producer.flush()
except KafkaError as e:
print (e)
class Kafka_consumer():
'''
消费模块: 通过不同groupid消费topic里面的消息
'''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.key = key
self.consumer = KafkaConsumer(self.kafkatopic, group_id = self.groupid,
bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort )
)
def consume_data(self):
try:
for message in self.consumer:
yield message
print("1")
print(message)
except KeyboardInterrupt as e:
print (e)
def main(xtype, group, key):
'''
测试consumer和producer
'''
if xtype == "p":
# 生产模块
producer = Kafka_producer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, key)
print ("===========> producer:", producer)
for _id in range(100):
# params = '{"msg" : "%s"}' % str(_id)
params=[{"msg0" :_id},{"msg1" :_id}]
producer.sendjsondata(params)
time.sleep(1)
if xtype == 'c':
# 消费模块
consumer = Kafka_consumer(KAFAKA_HOST, KAFAKA_PORT, KAFAKA_TOPIC, group)
print ("===========> consumer:", consumer)
message = consumer.consume_data()
print('2')
print(message)
for msg in message:
print ('msg---------------->k,v', msg.key,msg.value)
print ('offset---------------->', msg.offset)
if __name__ == '__main__':
xtype = sys.argv[1]
group = sys.argv[2]
key = sys.argv[3]
main(xtype, group, key)
- 启动kafka服务
kafka-server-start /usr/local/etc/kafka/server.properties