一、工作環境準備
erlang kafka用戶端庫使用的是brod,
https://github.com/klarna/brod emq使用的是v2.3.5版本, https://github.com/emqtt/emq-relx kafka的運作環境準備, http://blog.csdn.net/libaineu2004/article/details/79202408我們以插件的形式來實作,我的插件路徑是/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps
copy一份emq_plugin_template,并更名為emq_plugin_kafka_brod,注意相關配置檔案和源檔案都要更名。
進入目錄/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_brod
Makefile檔案,新增紅色文字
BUILD_DEPS = emqttd cuttlefish brod
dep_brod = git
https://github.com/klarna/brod.git3.4.0
二、工程檔案修改
1、進入目錄/home/firecat/Prj/emq2.0/emq-relx-2.3.5/Makefile 檔案
Makefile 增加
DEPS += emq_plugin_kafka_brod
2、relx.config 中 release 段落添加這兩行:
brod,
{emq_plugin_kafka_brod, load}
3、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/data/loaded_plugins設定自啟動插件
emq_recon.
emq_modules.
emq_retainer.
emq_dashboard.
emq_plugin_kafka_brod.
4、編譯過程會報錯:
DEP brod
make[2]: Entering directory `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod'
../../erlang.mk:1260: warning: overriding recipe for target `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/docopt'
../../erlang.mk:1235: warning: ignoring old recipe for target `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/docopt'
../../erlang.mk:1260: warning: overriding recipe for target `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/jsone'
../../erlang.mk:1235: warning: ignoring old recipe for target `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/jsone'
DEP supervisor3
Error: Unknown or invalid dependency: supervisor3.
make[2]: *** [/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/supervisor3] Error 78
make[2]: Leaving directory `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod'
make[1]: *** [deps] Error 2
make[1]: Leaving directory `/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_brod'
make: *** [deps] Error 2
解決方法如下:
/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/brod
Makefile檔案,第12和13行
dep_supervisor3_commit = 1.1.5
dep_kafka_protocol_commit = 1.1.2
修改為:
dep_supervisor3 = git
https://github.com/klarna/supervisor3.git1.1.5
dep_kafka_protocol = git
https://github.com/klarna/kafka_protocol.git1.1.2
三、源碼實作(完整的源碼下載下傳位址:
https://download.csdn.net/download/libaineu2004/10284403)
1、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/etc/emq_plugin_kafka_brod.config
[
{emq_plugin_kafka_brod, [
{kafka, [
{ bootstrap_broker, [{"127.0.0.1", 9092}] }, %%能使用"localhost"
{ query_api_versions, false },
{ reconnect_cool_down_seconds, 10}
]}
]}
].
kafka broker叢集時,建議不使用localhost和127.0.0.1。而是使用真實ip。可以多個ip,例如:
[{"172.16.6.170", 9092},{"172.16.6.170", 9093},{"172.16.6.170", 9094}]
2、/home/firecat/Prj/emq2.0/emq-relx-2.3.5/deps/emq_plugin_kafka_ekaf/src/emq_plugin_kafka_brod.erl
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emq_plugin_kafka_brod).
-include_lib("emqttd/include/emqttd.hrl").
-include_lib("brod/include/brod_int.hrl").
-define(TEST_TOPIC, <<"emqtest">>).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
brod_init([Env]),
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
Json = mochijson2:encode([
{type, <<"connected">>},
{client_id, ClientId},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
{ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
io:format("brod_produce_reply:ok ~n"),
ok
after 5000 ->
io:format("brod_produce_reply:exit ~n"),
erlang:exit(timeout)
%%ct:fail({?MODULE, ?LINE, timeout})
end,
{ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
Json = mochijson2:encode([
{type, <<"disconnected">>},
{client_id, ClientId},
{reason, Reason},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
{ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
ok
after 5000 ->
ct:fail({?MODULE, ?LINE, timeout})
end,
ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
{ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
{ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
{ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message, _Env) ->
io:format("publish ~s~n", [emqttd_message:format(Message)]),
Id = Message#mqtt_message.id,
From = Message#mqtt_message.from, %需要登入和不需要登入這裡的傳回值是不一樣的
Topic = Message#mqtt_message.topic,
Payload = Message#mqtt_message.payload,
Qos = Message#mqtt_message.qos,
Dup = Message#mqtt_message.dup,
Retain = Message#mqtt_message.retain,
Timestamp = Message#mqtt_message.timestamp,
ClientId = c(From),
Username = u(From),
Json = mochijson2:encode([
{type, <<"publish">>},
{client_id, ClientId},
{message, [
{username, Username},
{topic, Topic},
{payload, Payload},
{qos, i(Qos)},
{dup, i(Dup)},
{retain, i(Retain)}
]},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
{ok, CallRef} = brod:produce(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
ok
after 5000 ->
ct:fail({?MODULE, ?LINE, timeout})
end,
{ok, Message}.
on_message_delivered(ClientId, Username, Message, _Env) ->
io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
%% Called when the plugin application stop
unload() ->
%%application:stop(brod),
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
%% ===================================================================
%% brod_init https://github.com/klarna/brod
%% ===================================================================
brod_init(_Env) ->
{ok, _} = application:ensure_all_started(brod),
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaBootstrapEndpoints = proplists:get_value(bootstrap_broker, Kafka),
%%KafkaBootstrapEndpoints = [{"127.0.0.1", 9092}], %%localhost,172.16.6.161
%%KafkaBootstrapEndpoints = [{"localhost", 9092}], %%localhost,172.16.6.161
%%ClientConfig = [{reconnect_cool_down_seconds, 10}],%% socket error recovery
ClientConfig = [],%% socket error recovery
Topic = ?TEST_TOPIC,
Partition = 0,
ok = brod:start_client(KafkaBootstrapEndpoints, brod_client_1, ClientConfig),
ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),
%%ok = brod:produce_sync(brod_client_1, Topic, Partition, <<"key1">>, <<"value1">>),
%%{ok, CallRef} = brod:produce(brod_client_1, Topic, Partition, <<"key1">>, <<"value2">>),
io:format("Init ekaf with ~p~n", [KafkaBootstrapEndpoints]).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.
請注意:
(1)在運作brod:start_producer語句之前,務必保證Topic在kafka已經建立完畢,否則運作該語句會抛出異常。
Topic = ?TEST_TOPIC,
ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),
(2)關于brod:start_client,也可以這樣使用clientid:
-define(CLIENT_ID, ?MODULE).
ClientId = ?CLIENT_ID,
%%單機版BootstrapHosts = [{"localhost", 9092}],
BootstrapHosts = [{"172.16.6.170", 9092},{"172.16.6.170", 9093},{"172.16.6.170", 9094}],%%叢集版
ClientConfig = [],
ok = brod:start_client(BootstrapHosts, ClientId, ClientConfig),
3、kafka主題建立
注意,
(1)kafka broker和zookeeper使用叢集時,必須手動先建立topic,指定zookeeper節點清單;單機也應該要建立。否則用戶端會報錯。
(2)主題名不建議使用"."和"_"字元。
(3)建立主題
./bin/kafka-topics.sh --create --zookeeper 172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183 --replication-factor 3 --partitions 3 --topic emqtest
(4)消費者
./bin/kafka-console-consumer.sh --zookeeper 172.16.6.170:2181 --topic emqtest --from-beginning
4、上面舉例的都是分區寫成固定了,Partition = 0,那麼用戶端如何自定義分區?由于erlang brod庫沒有實作自動分區,是以需要我們手動計算hash值。
-define(NUM_PARTITIONS, 3).
-define(EMPTY(S), (S == <<"">> orelse S == undefined)).
getPartition(ClientId) when ?EMPTY(ClientId) ->
crypto:rand_uniform(0, ?NUM_PARTITIONS);
getPartition(ClientId) ->
<<NodeD01, NodeD02, NodeD03, NodeD04, NodeD05,
NodeD06, NodeD07, NodeD08, NodeD09, NodeD10,
NodeD11, NodeD12, NodeD13, NodeD14, NodeD15,
NodeD16>> = Val = crypto:hash(md5, ClientId),%%md5的值是16byte
io:format("Value is ~w~n", [Val]),
NodeD16 rem ?NUM_PARTITIONS.%%取餘數
具體實作舉例:
ClientId = <<"123456">>,
Partition = getPartition(ClientId),
brod:produce(?KAFKA_CLIENT1, ?EMQ_TOPIC_ONLINE, Partition, ClientId, list_to_binary(Json)).
我這個方案(取clientId的md5值,然後求餘)是自定義的簡易版,不是原生的java hashcode源碼的實作方式。原生的要檢視java.lang.String的hashcode函數源碼。
附上kafka的官方分區實作方式:
消息-分區的配置設定
預設情況下,Kafka根據傳遞消息的key來進行分區的配置設定,即hash(key) % numPartitions,如下圖所示:
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
這就保證了相同key的消息一定會被路由到相同的分區。如果你沒有指定key,那麼Kafka是如何确定這條消息去往哪個分區的呢?
if(key == null) { // 如果沒有指定key
val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有沒有緩存的現成的分區Id
id match {
case Some(partitionId) =>
partitionId // 如果有的話直接使用這個分區Id就好了
case None => // 如果沒有的話,
val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分區的leader所在的broker
if (availablePartitions.isEmpty)
throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 從中随機挑一個
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 更新緩存以備下一次直接使用
partitionId
}
}
可以看出,Kafka幾乎就是随機找一個分區發送無key的消息,然後把這個分區号加入到緩存中以備後面直接使用——當然了,Kafka本身也會清空該緩存(預設每10分鐘或每次請求topic中繼資料時)
完整的源碼下載下傳位址: