天天看点

Flink 读取kafka数据写入Pulsar(含报错The implementation of the RichSinkFunction is not serializable)Flink 读取kafka数据写入Pulsar(含报错The implementation of the RichSinkFunction is not serializable)

@羲凡——只为了更好的活着

Flink 读取kafka数据写入Pulsar(含报错The implementation of the RichSinkFunction is not serializable)

开始之前务必检查自己kafka、pulsar和flink的版本,在pom中添加正确依赖

本人kafka(0.11)、pulsar(2.4.1)和flink(1.7.2),切记版本不一样,代码会有差异

1.前期准备

a.添加maven依赖

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-flink</artifactId>
    <version>${pulsar.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
           

b.创建kafka topic,开启一个生产者

kafka-topics --create --zookeeper cdh01.com:2181,cdh02.com:2181,cdh03.com:2181 \
--replication-factor 1 --partitions 3 --topic testtopic
           
kafka-console-producer --topic testtopic \
--broker-list cdh01.com:9092,cdh02.com:9092,cdh03.com:9092
           

c.创建非分区topic(tp3)和分区topic(tp4)

pulsar-admin topics create persistent://test-tenant/ns1/tp3
           
pulsar-admin topics create-partitioned-topic \
persistent://test-tenant/ns1/tp4 --partitions 3
           

d.开启一个pulsar的消费者

pulsar-client consume persistent://test-tenant/ns1/tp3 \
-n 100 -s "consumer-test" -t "Exclusive"
           

2.实现代码

package pulsar;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

import java.nio.charset.StandardCharsets;
import java.util.Properties;

public class FlinkConsumeKafkaWritePulsar {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "cdh01.com:9092,cdh02.com:9092,cdh03.com:9092");
        properties.setProperty("group.id", "test227");
        properties.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        String pulsarUrl = "pulsar://cdh01.com:6650,cdh02.com:6650,cdh03.com:6650";

        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(
                "testtopic", 
                new SimpleStringSchema(), 
                properties);
        consumer.setStartFromLatest();

        DataStreamSource<String> text = env.addSource(consumer,"Kafka");
        DataStream<Tuple2<String, Integer>> sum = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String str, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] arr = str.split(" ");
                for (String s : arr) {
                    collector.collect(new Tuple2<>(s, 1));
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(3)).sum(1);
        sum.print();

        // 此方式只能写入非分区的topic
//        sum.addSink(new FlinkPulsarProducer<>(
//                pulsarUrl,
//                "persistent://test-tenant/ns1/tp3",
//                new AuthenticationDisabled(),
//                in -> in.toString().getBytes(StandardCharsets.UTF_8),
//                in -> in.f0
//        ));

        // 此方式非分区topic和分区topic都可以写入
        ClientConfigurationData clientConf = new ClientConfigurationData();
        clientConf.setServiceUrl(pulsarUrl);
        clientConf.setAuthentication(new AuthenticationDisabled());
        ProducerConfigurationData producerConf = new ProducerConfigurationData();
        producerConf.setMessageRoutingMode(MessageRoutingMode.SinglePartition);
        producerConf.setTopicName("persistent://test-tenant/ns1/tp4");

        sum.addSink(new FlinkPulsarProducer<>(
                clientConf,
                producerConf,
                in -> in.toString().getBytes(StandardCharsets.UTF_8),
                in -> in.f0
        ));
        env.execute();

    }
}

           

3.注意细节

开始之前务必检查自己kafka、pulsar和flink的版本,在pom中添加正确依赖

kafka的依赖说明,请查看官网https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/kafka.html

4.报错解决

如果各位使用的是pulasr2.4.1版本,一定会报如下的错误

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
	at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
	at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
	at pulsar.FlinkConsumeKafkaWritePulsar.main(FlinkConsumeKafkaWritePulsar.java:48)
Caused by: java.io.NotSerializableException: org.apache.pulsar.client.impl.DefaultBatcherBuilder
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
	... 4 more
           

解决方案网址(点进去按照他操作即可)

https://github.com/apache/pulsar/commit/6a67ae094b5ecfa1a4602b8f7baff9a838b44e23#diff-d4876cc56bcbd7b0fe549311203cd213

如果是小白白,就按照下面的步骤来吧

解决方案步骤

1.新建目录org.apache.pulsar.client.api,在下面创建接口BatcherBuilder,将下面的内容复制复制进去

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.pulsar.client.api;

import org.apache.pulsar.client.internal.DefaultImplementation;

import java.io.Serializable;

/**
 * Batcher builder
 */
public interface BatcherBuilder extends Serializable {

    /**
     * Default batch message container
     *
     * incoming single messages:
     * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
     *
     * batched into single batch message:
     * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
     */
    BatcherBuilder DEFAULT = DefaultImplementation.newDefaultBatcherBuilder();

    /**
     * Key based batch message container
     *
     * incoming single messages:
     * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
     *
     * batched into multiple batch messages:
     * [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
     */
    BatcherBuilder KEY_BASED = DefaultImplementation.newKeyBasedBatcherBuilder();

    /**
     * Build a new batch message container.
     * @return new batch message container
     */
    BatchMessageContainer build();

}

           

2.新建目录org.apache.pulsar.client.impl,在下面创建类DefaultBatcherBuilder,将下面的内容复制复制进去

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.BatcherBuilder;

public class DefaultBatcherBuilder implements BatcherBuilder {
    public DefaultBatcherBuilder() {
    }

    private static final long serialVersionUID = 1L;

    public BatchMessageContainer build() {
        return new BatchMessageContainerImpl();
    }
}

           

3.在org.apache.pulsar.client.impl目录下,创建KeyBasedBatcherBuilder类,将下面的内容拷贝进去

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.BatcherBuilder;

public class KeyBasedBatcherBuilder implements BatcherBuilder {
    public KeyBasedBatcherBuilder() {
    }

    private static final long serialVersionUID = 1L;

    public BatchMessageContainer build() {
        return new BatchMessageKeyBasedContainer();
    }
}

           

====================================================================

@羲凡——只为了更好的活着

若对博客中有任何问题,欢迎留言交流

继续阅读