Kafka結果表需要定義的DDL如下。
-
create table sink_kafka (
-
messageKey VARBINARY,
-
`message` VARBINARY,
-
PRIMARY KEY (messageKey)
-
) with (
-
type = 'kafka010',
-
topic = 'XXX',
-
`group.id` = 'XXX',
-
bootstrap.servers = 'ip1:port,ip2:port,ip3:port'
-
);
注意:
- 建立Kafka結果表時,必須顯示的指定PRIMARY KEY (messageKey)。
- 無論是阿裡雲Kafka還是自建Kafka,目前實時計算計算均無Tps、Rps等名額資訊。在作業上線之後,運維界面暫時不支援顯示名額資訊。
通用配置
參數 | 注釋說明 | 備注 |
---|---|---|
type | kafka對應版本 | 必選,必須是 KAFKA08、KAFKA09、KAFKA010、KAFKA011中的一種,版本對應關系見表格下方。 |
topic | 寫入的topic | topic名稱 |
必選配置
kafka08必選配置:
group.id | N/A | 消費組id |
zookeeper.connect | zk連結位址 | zk連接配接id |
kafka09/kafka010/kafka011必選配置:
bootstrap.servers | Kafka叢集位址 |
Kafka叢集位址:
如果您的Kkafka是阿裡雲商業版,參見
kafka商業版準備配置文檔。
如果您的Kafka是阿裡雲公測版,參見
kafka公測版準備配置文檔可選配置參數
-
"consumer.id","socket.timeout.ms","fetch.message.max.bytes","num.consumer.fetchers","auto.commit.enable","auto.commit.interval.ms","queued.max.message.chunks", "rebalance.max.retries","fetch.min.bytes","fetch.wait.max.ms","rebalance.backoff.ms","refresh.leader.backoff.ms","auto.offset.reset","consumer.timeout.ms","exclude.internal.topics","partition.assignment.strategy","client.id","zookeeper.session.timeout.ms","zookeeper.connection.timeout.ms","zookeeper.sync.time.ms","offsets.storage","offsets.channel.backoff.ms","offsets.channel.socket.timeout.ms","offsets.commit.max.retries","dual.commit.enabled","partition.assignment.strategy","socket.receive.buffer.bytes","fetch.min.bytes"
以上參數請使用者根據實際業務需要選擇使用。
其它可選配置項參考kafka官方文檔進行配置:
Kafka09
https://kafka.apache.org/0110/documentation.html#consumerconfigs Kafka010 https://kafka.apache.org/090/documentation.html#newconsumerconfigs Kafka011 https://kafka.apache.org/0102/documentation.html#newconsumerconfigs
Kafka 版本 | |
---|---|
Kafka08 | 0.8.22 |
0.9.0.1 | |
0.10.2.1 | |
0.11.0.2 |
-
create table datahub_input (
-
messageKey VARBINARY,
-
`message` VARBINARY
-
) with (type = 'random');
-
create table sink_kafka (
-
messageKey VARBINARY,
-
`message` VARBINARY,
-
PRIMARY KEY (messageKey)
-
) with (
-
type = 'kafka010',
-
topic = 'kafka_sink',
-
`group.id` = 'CID_kafka_sink',
-
bootstrap.servers = '192.168.116.8:9092,192.168.116.9:9092,192.168.116.10:9092'
-
);
-
INSERT INTO
-
sink_kafka
-
SELECT
-
*
-
FROM
-
datahub_input;
本文轉自實時計算——