天天看點

《kafka中文手冊》-快速開始(三)

writing data from the console and writing it back to the console is a convenient place to start, but you’ll probably want to use data from other sources or export data from kafka to other systems. for many systems, instead of writing custom integration code you can use kafka connect to import or export data.

從控制太寫入和讀取資料友善大家開始學習, 但是你可能需要從其他系統導入到kafka或從kafka導出資料到其它系統. 對于很多系統, 你不需要寫寫特定的整合代碼, 隻需要使用kafka connect提供的功能進行導入導出

kafka connect is a tool included with kafka that imports and exports data to kafka. it is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. in this quickstart we’ll see how to run kafka connect with simple connectors that import data from a file to a kafka topic and export data from a kafka topic to a file.

kafka connect工具包含資料的導入和導出, 它可以是一個外部工具connectors, 和外部系統互動實作特定的業務邏輯後. 在下面的例子中我們會看到一個簡單的連接配接器, 從檔案中讀取資料寫入到kafka, 和從kafka的topic中導出資料到檔案中去

first, we’ll start by creating some seed data to test with:

首先, 我們先建立一些随機的測試資料

next, we’ll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. we provide three configuration files as parameters. the first is always the configuration for the kafka connect process, containing common configuration such as the kafka brokers to connect to and the serialization format for data. the remaining configuration files each specify a connector to create. these files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.

接下來, 我們啟動兩個獨立連接配接器, 這意味着它們以單一的,本地的,專用的程序進行運作. 我們以參數的形式提供三個配置檔案, 第一個參數是kafka連接配接器常用的一些配置, 包含kafka連接配接的伺服器器位址, 資料的序列化格式等. 剩下的配置檔案各自包含一個要建立的連接配接器, 這些檔案包含一個唯一的連接配接器名稱, 連接配接器對于的啟動類, 還任何連接配接器需要的其他配置資訊

these sample configuration files, included with kafka, use the default local cluster configuration you started earlier and create two connectors: the first is a source connector that reads lines from an input file and produces each to a kafka topic and the second is a sink connector that reads messages from a kafka topic and produces each as a line in an output file.

在這些樣例包含kafka之前啟動的預設本地的叢集, 和啟動兩個連接配接器, 第一個輸入連接配接器負責從一個檔案按行讀取資料, 并釋出到kafka的topic上去, 第二個是一個輸入連接配接器, 負責從kafka的topic中讀取資料, 并按行輸出到檔案中

during startup you’ll see a number of log messages, including some indicating that the connectors are being instantiated. once the kafka connect process has started, the source connector should start reading lines from <code>test.txt</code> and producing them to the topic <code>connect-test</code>, and the sink connector should start reading messages from the topic <code>connect-test</code> and write them to the file <code>test.sink.txt</code>. we can verify the data has been delivered through the entire pipeline by examining the contents of the output file:

在啟動的時候, 你會看到一些日志資訊, 包括連接配接器初始化的啟動資訊. 一旦kafka連接配接器啟動起來, 源連接配接器從test.txt 檔案中讀取行記錄, 并釋出到connect-test的主題中, 輸出連接配接器開始從connect-test主題中讀取資料, 并寫入到 test.sink.txt 檔案中, 我們可以根據輸出檔案的内容, 檢測這個管道的資料傳送是否正常

note that the data is being stored in the kafka topic <code>connect-test</code>, so we can also run a console consumer to see the data in the topic (or use custom consumer code to process it):

現在資料已經儲存到connect-test主題中, 是以, 我們可以跑一個終端消費者來檢視主題中的資料(或者編寫特定的消費進行處理)

the connectors continue to process data, so we can add data to the file and see it move through the pipeline:

連接配接器持續在處理資料, 是以我們可以把資料寫入到檔案中, 檢視到資料是否通過這個管道處理

you should see the line appear in the console consumer output and in the sink file.

你可以看到那些從消費者終端輸入的行, 都會寫入到輸出檔案中

kafka streams is a client library of kafka for real-time stream processing and analyzing data stored in kafka brokers. this quickstart example will demonstrate how to run a streaming application coded in this library. here is the gist of the <code>wordcountdemo</code> example code (converted to use java 8 lambda expressions for easy reading).

kafka streams 是kafka提供用于對存儲到kafka伺服器中的資料進行實時分析和資料處理的庫. 這裡展示了怎麼使用該庫進行流處理的一個樣例, 這裡是提供了workcountdemo樣例的部分代碼(為了友善閱讀使用java 8 lambda表達式方式)

it implements the wordcount algorithm, which computes a word occurrence histogram from the input text. however, unlike other wordcount examples you might have seen before that operate on bounded data, the wordcount demo application behaves slightly differently because it is designed to operate on an infinite, unbounded stream of data. similar to the bounded variant, it is a stateful algorithm that tracks and updates the counts of words. however, since it must assume potentially unbounded input data, it will periodically output its current state and results while continuing to process more data because it cannot know when it has processed “all” the input data.

這裡實作從輸入文本中統計單詞出現次數的算法. 和其他的有限輸入的單詞計數算法不一樣, 這個樣例有點不一樣, 因為它的輸入流是無限, 沒有邊界的資料流資料, 類似有邊界的輸入, 這裡包含有狀态的算法用于跟蹤和更新單詞技術. 當然, 由于處理的資料是沒有邊界的, 處理程式并不知道什麼時候已經處理完全部的輸入資料, 是以, 它在處理資料的同時, 并定時輸出目前的狀态和結果,

we will now prepare input data to a kafka topic, which will subsequently be processed by a kafka streams application.

我們現在為流處理應用準備點資料到kafka的主題中

or on windows:

next, we send this input data to the input topic named streams-file-input using the console producer (in practice, stream data will likely be flowing continuously into kafka where the application will be up and running):

接下來, 我們使用終端生産者把資料輸入到streams-file-input主題中(實際生産中, 在應用啟動時, 流資料将會持續輸入到kafka中)

we can now run the wordcount demo application to process the input data: 現在我們跑下wordcount 樣例來處理這些資料.

there won’t be any stdout output except log entries as the results are continuously written back into another topic named streams-wordcount-output in kafka. the demo will run for a few seconds and then, unlike typical stream processing applications, terminate automatically.

we can now inspect the output of the wordcount demo application by reading from its output topic:

這裡不會有任何日志輸出到标準輸出,  然後, 有持續的結果輸出到kafka另外一個主題叫做 streams-wordcount-output , 和其它的流處理系統不一樣, 樣例隻會跑幾秒鐘然後自動停止了. 我們可以從輸出主題中讀取資料來檢測wordcount樣例的資料處理結果.

with the following output data being printed to the console: 将會有如下的資料輸出到控制台上

here, the first column is the kafka message key, and the second column is the message value, both in in <code>java.lang.string</code> format. note that the output is actually a continuous stream of updates, where each data record (i.e. each line in the original output above) is an updated count of a single word, aka record key such as “kafka”. for multiple records with the same key, each later record is an update of the previous one.

這裡, 第一欄是kafka消息的鍵值, 第二欄是消息值, 都是使用java.lang.string的格式, 注意, 輸出實際是上持續資料流的更新, 每條記錄都是對每個單詞的持續更新計數, 比如 “kafka”這個單詞的記錄. 如果多條記錄有相同的鍵, 則每條記錄都會更新計數器.

now you can write more input messages to the streams-file-input topic and observe additional messages added to streams-wordcount-output topic, reflecting updated word counts (e.g., using the console producer and the console consumer, as described above).

you can stop the console consumer via ctrl-c.

現在, 你可以寫入更多的消息到streams-file-input 主題中, 然後觀察另外輸出到streams-wordcount-output主題中的記錄中被更新的單詞計數(e.g, 使用前面提到的終端生産者和終端消費者)

note: because new protocols are introduced, it is important to upgrade your kafka clusters before upgrading your clients (i.e. 0.10.1.x clients only support 0.10.1.x or later brokers while 0.10.1.x brokers also support older clients).

for a rolling upgrade:

update server.properties file on all brokers and add the following properties:

inter.broker.protocol.version=current_kafka_version (e.g. 0.8.2.0, 0.9.0.0 or 0.10.0.0).

upgrade the brokers one at a time: shut down the broker, update the code, and restart it.

once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.1.0.

if your previous message format is 0.10.0, change log.message.format.version to 0.10.1 (this is a no-op as the message format is the same for both 0.10.0 and 0.10.1). if your previous message format version is lower than 0.10.0, do not change log.message.format.version yet – this parameter should only change once all consumers have been upgraded to 0.10.0.0 or later.

restart the brokers one by one for the new protocol version to take effect.

if log.message.format.version is still lower than 0.10.0 at this point, wait until all consumers have been upgraded to 0.10.0 or later, then change log.message.format.version to 0.10.1 on each broker and restart them one by one.

note: if you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. they will start with the new protocol by default.

note: bumping the protocol version and restarting can be done any time after the brokers were upgraded. it does not have to be immediately after.

the log retention time is no longer based on last modified time of the log segments. instead it will be based on the largest timestamp of the messages in a log segment.

the open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.

the time index and offset index share the same index size configuration. since each time index entry is 1.5x the size of offset index entry. user may need to increase log.index.size.max.bytes to avoid potential frequent log rolling.

due to the increased number of index files, on some brokers with large amount the log segments (e.g. &gt;15k), the log loading process during the broker startup could be longer. based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time.

the new java consumer is no longer in beta and we recommend it for all new development. the old scala consumers are still supported, but they will be deprecated in the next release and will be removed in a future major release.

the <code>--new-consumer</code>/<code>--new.consumer</code> switch is no longer required to use tools like mirrormaker and the console consumer with the new consumer; one simply needs to pass a kafka broker to connect to instead of the zookeeper ensemble. in addition, usage of the console consumer with the old consumer has been deprecated and it will be removed in a future major release.

kafka clusters can now be uniquely identified by a cluster id. it will be automatically generated when a broker is upgraded to 0.10.1.0. the cluster id is available via the kafka.server:type=kafkaserver,name=clusterid metric and it is part of the metadata response. serializers, client interceptors and metric reporters can receive the cluster id by implementing the clusterresourcelistener interface.

the brokerstate “runningascontroller” (value 4) has been removed. due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. the recommended way to detect if a given broker is the controller is via the kafka.controller:type=kafkacontroller,name=activecontrollercount metric.

the new java consumer now allows users to search offsets by timestamp on partitions.

when using an authorizer and a user doesn’t have describe authorization on a topic, the broker will no longer return topic_authorization_failed errors to requests since this leaks topic names. instead, the unknown_topic_or_partition error code will be returned. this may cause unexpected timeouts or delays when using the producer and consumer since kafka clients will typically retry automatically on unknown topic errors. you should consult the client logs if you suspect this could be happening.

fetch responses have a size limit by default (50 mb for consumers and 10 mb for replication). the existing per partition limits also apply (1 mb for consumers and replication). note that neither of these limits is an absolute maximum as explained in the next point.

consumers and replicas can make progress if a message larger than the response/partition size limit is found. more concretely, if the first message in the first non-empty partition of the fetch is larger than either or both limits, the message will still be returned.

overloaded constructors were added to <code>kafka.api.fetchrequest</code> and <code>kafka.javaapi.fetchrequest</code> to allow the caller to specify the order of the partitions (since order is significant in v3). the previously existing constructors were deprecated and the partitions are shuffled before the request is sent to avoid starvation issues.

listoffsetrequest v1 supports accurate offset search based on timestamps.

metadataresponse v2 introduces a new field: “cluster_id”.

fetchrequest v3 supports limiting the response size (in addition to the existing per partition limit), it returns messages bigger than the limits if required to make progress and the order of partitions in the request is now significant.

joingroup v1 introduces a new field: “rebalance_timeout”.

note: because new protocols are introduced, it is important to upgrade your kafka clusters before upgrading your clients.

notes to clients with version 0.9.0.0: due to a bug introduced in 0.9.0.0, clients that depend on zookeeper (old scala high-level consumer and mirrormaker if used with the old consumer) will not work with 0.10.0.x brokers. therefore, 0.9.0.0 clients should be upgraded to 0.9.0.1 before brokers are upgraded to 0.10.0.x. this step is not necessary for 0.8.x or 0.9.0.1 clients.

inter.broker.protocol.version=current_kafka_version (e.g. 0.8.2 or 0.9.0.0).

upgrade the brokers. this can be done a broker at a time by simply bringing it down, updating the code, and restarting it.

once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.10.0.0. note: you shouldn’t touch log.message.format.version yet – this parameter should only change once all consumers have been upgraded to 0.10.0.0

once all consumers have been upgraded to 0.10.0, change log.message.format.version to 0.10.0 on each broker and restart them one by one.

the message format in 0.10.0 includes a new timestamp field and uses relative offsets for compressed messages. the on disk message format can be configured through log.message.format.version in the server.properties file. the default on-disk message format is 0.10.0. if a consumer client is on a version before 0.10.0.0, it only understands message formats before 0.10.0. in this case, the broker is able to convert messages from the 0.10.0 format to an earlier format before sending the response to the consumer on an older version. however, the broker can’t use zero-copy transfer in this case. reports from the kafka community on the performance impact have shown cpu utilization going from 20% before to 100% after an upgrade, which forced an immediate upgrade of all clients to bring performance back to normal. to avoid such message conversion before consumers are upgraded to 0.10.0.0, one can set log.message.format.version to 0.8.2 or 0.9.0 when upgrading the broker to 0.10.0.0. this way, the broker can still use zero-copy transfer to send the data to the old consumers. once consumers are upgraded, one can change the message format to 0.10.0 on the broker and enjoy the new message format that includes new timestamp and improved compression. the conversion is supported to ensure compatibility and can be useful to support a few apps that have not updated to newer clients yet, but is impractical to support all consumer traffic on even an overprovisioned cluster. therefore, it is critical to avoid the message conversion as much as possible when brokers have been upgraded but the majority of clients have not.

for clients that are upgraded to 0.10.0.0, there is no performance impact.

note: by setting the message format version, one certifies that all existing messages are on or below that message format version. otherwise consumers before 0.10.0.0 might break. in particular, after the message format is set to 0.10.0, one should not change it back to an earlier format as it may break consumers on versions before 0.10.0.0.

note: due to the additional timestamp introduced in each message, producers sending small messages may see a message throughput degradation because of the increased overhead. likewise, replication now transmits an additional 8 bytes per message. if you’re running close to the network capacity of your cluster, it’s possible that you’ll overwhelm the network cards and see failures and performance issues due to the overload.

starting from kafka 0.10.0.0, the message format version in kafka is represented as the kafka version. for example, message format 0.9.0 refers to the highest message version supported by kafka 0.9.0.

message format 0.10.0 has been introduced and it is used by default. it includes a timestamp field in the messages and relative offsets are used for compressed messages.

producerequest/response v2 has been introduced and it is used by default to support message format 0.10.0

fetchrequest/response v2 has been introduced and it is used by default to support message format 0.10.0

messageformatter interface was changed from <code>def writeto(key: array[byte], value: array[byte], output: printstream)</code> to<code>def writeto(consumerrecord: consumerrecord[array[byte], array[byte]], output: printstream)</code>

messagereader interface was changed from <code>def readmessage(): keyedmessage[array[byte], array[byte]]</code> to <code>def readmessage(): producerrecord[array[byte], array[byte]]</code>

messageformatter’s package was changed from <code>kafka.tools</code> to <code>kafka.common</code>

messagereader’s package was changed from <code>kafka.tools</code> to <code>kafka.common</code>

mirrormakermessagehandler no longer exposes the <code>handle(record: messageandmetadata[array[byte], array[byte]])</code> method as it was never called.

the 0.7 kafkamigrationtool is no longer packaged with kafka. if you need to migrate from 0.7 to 0.10.0, please migrate to 0.8 first and then follow the documented upgrade process to upgrade from 0.8 to 0.10.0.

the new consumer has standardized its apis to accept <code>java.util.collection</code> as the sequence type for method parameters. existing code may have to be updated to work with the 0.10.0 client library.

the default value of the configuration parameter <code>receive.buffer.bytes</code> is now 64k for the new consumer.

the new consumer now exposes the configuration parameter <code>exclude.internal.topics</code> to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. by default, it is enabled.

the old scala producer has been deprecated. users should migrate their code to the java producer included in the kafka-clients jar as soon as possible.

the new consumer api has been marked stable.

update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.x

once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.9.0.0.

restart the brokers one by one for the new protocol version to take effect

java 1.6 is no longer supported.

scala 2.9 is no longer supported.

configuration parameter replica.lag.max.messages was removed. partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync.

compacted topics no longer accept messages without key and an exception is thrown by the producer if this is attempted. in 0.8.x, a message without key would cause the log compaction thread to subsequently complain and quit (and stop compacting all compacted topics).

mirrormaker no longer supports multiple target clusters. as a result it will only accept a single –consumer.config parameter. to mirror multiple source clusters, you will need at least one mirrormaker instance per source cluster, each with its own consumer configuration.

tools packaged under org.apache.kafka.clients.tools.* have been moved to org.apache.kafka.tools.*. all included scripts will still function as usual, only custom code directly importing these classes will be affected.

the default kafka jvm performance options (kafka_jvm_performance_opts) have been changed in kafka-run-class.sh.

the kafka-topics.sh script (kafka.admin.topiccommand) now exits with non-zero exit code on failure.

the kafka-topics.sh script (kafka.admin.topiccommand) will now print a warning when topic names risk metric collisions due to the use of a ‘.’ or ‘_’ in the topic name, and error in the case of an actual collision.

the kafka-console-producer.sh script (kafka.tools.consoleproducer) will use the java producer instead of the old scala producer be default, and users have to specify ‘old-producer’ to use the old producer.

by default, all command line tools will print all logging messages to stderr instead of stdout.

configuration parameter log.cleaner.enable is now true by default. this means topics with a cleanup.policy=compact will now be compacted by default, and 128 mb of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. you may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics.

default value of configuration parameter fetch.min.bytes for the new consumer is now 1 by default.

altering topic configuration from the kafka-topics.sh script (kafka.admin.topiccommand) has been deprecated. going forward, please use the kafka-configs.sh script (kafka.admin.configcommand) for this functionality.

the kafka-consumer-offset-checker.sh (kafka.tools.consumeroffsetchecker) has been deprecated. going forward, please use kafka-consumer-groups.sh (kafka.admin.consumergroupcommand) for this functionality.

the kafka.tools.producerperformance class has been deprecated. going forward, please use org.apache.kafka.tools.producerperformance for this functionality (kafka-producer-perf-test.sh will also be changed to use the new class).

0.8.2 is fully compatible with 0.8.1. the upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.

0.8.1 is fully compatible with 0.8. the upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.