@羲凡——只为了更好的活着
Flink 读取kafka数据写入Pulsar(含报错The implementation of the RichSinkFunction is not serializable)
开始之前务必检查自己kafka、pulsar和flink的版本,在pom中添加正确依赖
本人kafka(0.11)、pulsar(2.4.1)和flink(1.7.2),切记版本不一样,代码会有差异
1.前期准备
a.添加maven依赖
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-flink</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
b.创建kafka topic,开启一个生产者
kafka-topics --create --zookeeper cdh01.com:2181,cdh02.com:2181,cdh03.com:2181 \
--replication-factor 1 --partitions 3 --topic testtopic
kafka-console-producer --topic testtopic \
--broker-list cdh01.com:9092,cdh02.com:9092,cdh03.com:9092
c.创建非分区topic(tp3)和分区topic(tp4)
pulsar-admin topics create persistent://test-tenant/ns1/tp3
pulsar-admin topics create-partitioned-topic \
persistent://test-tenant/ns1/tp4 --partitions 3
d.开启一个pulsar的消费者
pulsar-client consume persistent://test-tenant/ns1/tp3 \
-n 100 -s "consumer-test" -t "Exclusive"
2.实现代码
package pulsar;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
public class FlinkConsumeKafkaWritePulsar {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "cdh01.com:9092,cdh02.com:9092,cdh03.com:9092");
properties.setProperty("group.id", "test227");
properties.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
String pulsarUrl = "pulsar://cdh01.com:6650,cdh02.com:6650,cdh03.com:6650";
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
"testtopic",
new SimpleStringSchema(),
properties);
consumer.setStartFromLatest();
DataStreamSource<String> text = env.addSource(consumer,"Kafka");
DataStream<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String str, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] arr = str.split(" ");
for (String s : arr) {
collector.collect(new Tuple2<>(s, 1));
}
}
}).keyBy(0).timeWindow(Time.seconds(3)).sum(1);
sum.print();
// 此方式只能写入非分区的topic
// sum.addSink(new FlinkPulsarProducer<>(
// pulsarUrl,
// "persistent://test-tenant/ns1/tp3",
// new AuthenticationDisabled(),
// in -> in.toString().getBytes(StandardCharsets.UTF_8),
// in -> in.f0
// ));
// 此方式非分区topic和分区topic都可以写入
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(pulsarUrl);
clientConf.setAuthentication(new AuthenticationDisabled());
ProducerConfigurationData producerConf = new ProducerConfigurationData();
producerConf.setMessageRoutingMode(MessageRoutingMode.SinglePartition);
producerConf.setTopicName("persistent://test-tenant/ns1/tp4");
sum.addSink(new FlinkPulsarProducer<>(
clientConf,
producerConf,
in -> in.toString().getBytes(StandardCharsets.UTF_8),
in -> in.f0
));
env.execute();
}
}
3.注意细节
开始之前务必检查自己kafka、pulsar和flink的版本,在pom中添加正确依赖
kafka的依赖说明,请查看官网https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html
4.报错解决
如果各位使用的是pulasr2.4.1版本,一定会报如下的错误
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
at pulsar.FlinkConsumeKafkaWritePulsar.main(FlinkConsumeKafkaWritePulsar.java:48)
Caused by: java.io.NotSerializableException: org.apache.pulsar.client.impl.DefaultBatcherBuilder
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 4 more
解决方案网址(点进去按照他操作即可)
https://github.com/apache/pulsar/commit/6a67ae094b5ecfa1a4602b8f7baff9a838b44e23#diff-d4876cc56bcbd7b0fe549311203cd213
如果是小白白,就按照下面的步骤来吧
解决方案步骤
1.新建目录org.apache.pulsar.client.api,在下面创建接口BatcherBuilder,将下面的内容复制复制进去
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import org.apache.pulsar.client.internal.DefaultImplementation;
import java.io.Serializable;
/**
* Batcher builder
*/
public interface BatcherBuilder extends Serializable {
/**
* Default batch message container
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* batched into single batch message:
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
*/
BatcherBuilder DEFAULT = DefaultImplementation.newDefaultBatcherBuilder();
/**
* Key based batch message container
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* batched into multiple batch messages:
* [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
*/
BatcherBuilder KEY_BASED = DefaultImplementation.newKeyBasedBatcherBuilder();
/**
* Build a new batch message container.
* @return new batch message container
*/
BatchMessageContainer build();
}
2.新建目录org.apache.pulsar.client.impl,在下面创建类DefaultBatcherBuilder,将下面的内容复制复制进去
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.pulsar.client.impl;
import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.BatcherBuilder;
public class DefaultBatcherBuilder implements BatcherBuilder {
public DefaultBatcherBuilder() {
}
private static final long serialVersionUID = 1L;
public BatchMessageContainer build() {
return new BatchMessageContainerImpl();
}
}
3.在org.apache.pulsar.client.impl目录下,创建KeyBasedBatcherBuilder类,将下面的内容拷贝进去
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.pulsar.client.impl;
import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.BatcherBuilder;
public class KeyBasedBatcherBuilder implements BatcherBuilder {
public KeyBasedBatcherBuilder() {
}
private static final long serialVersionUID = 1L;
public BatchMessageContainer build() {
return new BatchMessageKeyBasedContainer();
}
}
====================================================================
@羲凡——只为了更好的活着
若对博客中有任何问题,欢迎留言交流