配置設定分區
在配置設定分區的時候,要注意。對于一個已經建立了分區的主題且已經指定了分區,那麼之後的producer代碼如果是直接修改partitioner部分的代碼,直接引入key值進行分區的重新配置設定的話,是不行的,會繼續按照之前的分區進行添加(之前的分區是分區0,隻有一個)。此時如果在程式中檢視partition_cnt我們是可以看到,該值并沒有因為config/server.properties的修改而變化,這是因為此時的partition_cnt是針對該已經建立的主題topic的。
而如果尚自單純修改代碼中的partition_cnt在用于計算分區值時候:djb_hash(key->c_str(), key->size()) % 5 是會得到如下結果的:提示分區不存在。
我們可以通過rdkafka_example來檢視某個topic下對應的partition數量的:
./rdkafka_example -L -t helloworld_kugou -b localhost:9092
從中我們可以看到helloworld_kugou主題隻有一個partition,而helloworld_kugou1主題是有5個partition的,這個和我們預期的相符合。
我們可以對已經建立的主題修改其分區:
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --partition 5 --topic helloworld_kugou
修改完之後,我們可以看出,helloworld_kugou已經變為5個分區了。
具體示例:
建立topic為helloworld_kugou_test,5個partition。我們可以看到,在producer端進行輸入之前,在預先設定好的log目錄下是已經有5個partition:
producer端代碼:class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{
public:
void dr_cb (RdKafka::Message &message) {
std::cout << "Message delivery for (" << message.len() << " bytes): " <<
message.errstr() << std::endl;
if (message.key())
std::cout << "Key: " << *(message.key()) << ";" << std::endl;
}
};
class ExampleEventCb : public RdKafka::EventCb {
public:
void event_cb (RdKafka::Event &event) {
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
run = false;
break;
case RdKafka::Event::EVENT_STATS:
std::cerr << "\"STATS\": " << event.str() << std::endl;
break;
case RdKafka::Event::EVENT_LOG:
fprintf(stderr, "LOG-%i-%s: %s\n",
event.severity(), event.fac().c_str(), event.str().c_str());
break;
default:
std::cerr << "EVENT " << event.type() <<
" (" << RdKafka::err2str(event.err()) << "): " <<
event.str() << std::endl;
break;
}
}
};
/* Use of this partitioner is pretty pointless since no key is provided
* in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {
public:
int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque)
{
std::cout<<"partition_cnt="<<partition_cnt<<std::endl;
return djb_hash(key->c_str(), key->size()) % partition_cnt;
}
private:
static inline unsigned int djb_hash (const char *str, size_t len)
{
unsigned int hash = 5381;
for (size_t i = 0 ; i < len ; i++)
hash = ((hash << 5) + hash) + str[i];
std::cout<<"hash1="<<hash<<std::endl;
return hash;
}
};
void TestProducer()
{
std::string brokers = "localhost";
std::string errstr;
std::string topic_str="helloworld_kugou_test";//自行制定主題topic
MyHashPartitionerCb hash_partitioner;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
int use_ccb = 0;
//Create configuration objects
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK)
{
std::cerr << errstr << std::endl;
exit(1);
}
/*
* Set configuration properties
*/
conf->set("metadata.broker.list", brokers, errstr);
ExampleEventCb ex_event_cb;
conf->set("event_cb", &ex_event_cb, errstr);
ExampleDeliveryReportCb ex_dr_cb;
/* Set delivery report callback */
conf->set("dr_cb", &ex_dr_cb, errstr);
/*
* Create producer using accumulated global configuration.
*/
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer)
{
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
/*
* Read messages from stdin and produce to broker.
*/
for (std::string line; run && std::getline(std::cin, line);)
{
if (line.empty())
{
producer->poll(0);
continue;
}
/*
* Produce message
// 1. topic
// 2. partition
// 3. flags
// 4. payload
// 5. payload len
// 6. std::string key
// 7. msg_opaque? NULL
*/
std::string key=line.substr(0,5);//根據line前5個字元串作為key值
// int a = MyHashPartitionerCb::djb_hash(key.c_str(),key.size());
// std::cout<<"hash="<<a<<std::endl;
RdKafka::ErrorCode resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>(line.c_str()), line.size(),
key.c_str(), key.size(), NULL);//這裡可以設計key值,因為會根據key值放在對應的partition
if (resp != RdKafka::ERR_NO_ERROR)
std::cerr << "% Produce failed: " <<RdKafka::err2str(resp) << std::endl;
else
std::cerr << "% Produced message (" << line.size() << " bytes)" <<std::endl;
producer->poll(0);//對于socket進行讀寫操作。poll方法才是做實際的IO操作的。return the number of events served
}
//
run = true;
while (run && producer->outq_len() > 0) {
std::cerr << "Waiting for " << producer->outq_len() << std::endl;
producer->poll(1000);
}
delete topic;
delete producer;
}
Consumer端代碼:
void msg_consume(RdKafka::Message* message, void* opaque)
{
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
std::cout << "Read msg at offset " << message->offset() << std::endl;
if (message->key())
{
std::cout << "Key: " << *message->key() << std::endl;
}
printf("%.*s\n", static_cast<int>(message->len()),static_cast<const char *>(message->payload()));
break;
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
if (exit_eof)
{
run = false;
}
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
break;
default:
/* Errors */
std::cerr << "Consume failed: " << message->errstr() << std::endl;
run = false;
}
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:
void consume_cb (RdKafka::Message &msg, void *opaque)
{
msg_consume(&msg, opaque);
}
};
void TestConsumer()
{
std::string brokers = "localhost";
std::string errstr;
std::string topic_str="helloworld_kugou_test";//helloworld_kugou
MyHashPartitionerCb hash_partitioner;
int32_t partition = RdKafka::Topic::PARTITION_UA;//為何不能用??在Consumer這裡隻能寫0???無法自動嗎???
partition = 3;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
int use_ccb = 0;
//Create configuration objects
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK)
{
std::cerr << errstr << std::endl;
exit(1);
}
/*
* Set configuration properties
*/
conf->set("metadata.broker.list", brokers, errstr);
ExampleEventCb ex_event_cb;
conf->set("event_cb", &ex_event_cb, errstr);
ExampleDeliveryReportCb ex_dr_cb;
/* Set delivery report callback */
conf->set("dr_cb", &ex_dr_cb, errstr);
/*
* Create consumer using accumulated global configuration.
*/
RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);
if (!consumer)
{
std::cerr << "Failed to create consumer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created consumer " << consumer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);
if (!topic)
{
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
/*
* Start consumer for topic+partition at start offset
*/
RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;
exit(1);
}
ExampleConsumeCb ex_consume_cb;
/*
* Consume messages
*/
while (run)
{
if (use_ccb)
{
consumer->consume_callback(topic, partition, 1000, &ex_consume_cb, &use_ccb);
}
else
{
RdKafka::Message *msg = consumer->consume(topic, partition, 1000);
msg_consume(msg, NULL);
delete msg;
}
consumer->poll(0);
}
/*
* Stop consumer
*/
consumer->stop(topic, partition);
consumer->poll(1000);
delete topic;
delete consumer;
}
那麼在producer端怎麼根據key值擷取具體是進入哪個partition的呢?是否有接口可以檢視呢?這個有待補充。
http://blog.csdn.net/ljp1919/article/details/73693150 http://www.cnblogs.com/vincent-vg/p/5855924.html