天天看點

建立Kafka結果表 建立Kafka結果表

本頁目錄

Kafka結果表需要定義的DDL如下。

  1. create table sink_kafka (

  2. messageKey VARBINARY,

  3. `message` VARBINARY,

  4. PRIMARY KEY (messageKey)

  5. ) with (

  6. type = 'kafka010',

  7. topic = 'XXX',

  8. `group.id` = 'XXX',

  9. bootstrap.servers = 'ip1:port,ip2:port,ip3:port'

  10. );

注意:
  1. 建立Kafka結果表時,必須顯示的指定PRIMARY KEY (messageKey)。
  2. 無論是阿裡雲Kafka還是自建Kafka,目前實時計算計算均無Tps、Rps等名額資訊。在作業上線之後,運維界面暫時不支援顯示名額資訊。

通用配置

參數 注釋說明 備注
type kafka對應版本 必選,必須是 KAFKA08、KAFKA09、KAFKA010、KAFKA011中的一種,版本對應關系見表格下方。
topic 寫入的topic topic名稱

必選配置

kafka08必選配置:

group.id N/A 消費組id
zookeeper.connect zk連結位址 zk連接配接id

kafka09/kafka010/kafka011必選配置:

bootstrap.servers Kafka叢集位址

Kafka叢集位址:

如果您的Kkafka是阿裡雲商業版,參見

kafka商業版準備配置文檔

如果您的Kafka是阿裡雲公測版,參見

kafka公測版準備配置文檔

可選配置參數

  1. "consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"

以上參數請使用者根據實際業務需要選擇使用。

其它可選配置項參考kafka官方文檔進行配置:

Kafka09

https://kafka.apache.org/0110/documentation.html#consumerconfigs Kafka010 https://kafka.apache.org/090/documentation.html#newconsumerconfigs Kafka011 https://kafka.apache.org/0102/documentation.html#newconsumerconfigs

Kafka 版本
Kafka08 0.8.22
0.9.0.1
0.10.2.1
0.11.0.2

  1. create table datahub_input (

  2. messageKey VARBINARY,

  3. `message` VARBINARY

  4. ) with (type = 'random');

  5. create table sink_kafka (

  6. messageKey VARBINARY,

  7. `message` VARBINARY,

  8. PRIMARY KEY (messageKey)

  9. ) with (

  10. type = 'kafka010',

  11. topic = 'kafka_sink',

  12. `group.id` = 'CID_kafka_sink',

  13. bootstrap.servers = '192.168.116.8:9092,192.168.116.9:9092,192.168.116.10:9092'

  14. );

  15. INSERT INTO

  16. sink_kafka

  17. SELECT

  18. *

  19. FROM

  20. datahub_input;

本文轉自實時計算——