天天看点

Java整合Flink批处理本地数据

Flink是一个流式数据处理框架。我们与Java进行整合的第一个例程基于简单的原则,从本地一次性获取需要的数据,一次性处理完成。这个例程在本地运行,不需要服务器的支撑。

创建一个maven工程。

一、pom依赖。

<properties>
    <flink.version>1.13.0</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>
           

这三个依赖都是必须的,版本要一致。如果准备好将来要放远程服务集群去执行,也要和服务器的版本保持一致。

Java整合Flink批处理本地数据

二、测试类。

package com.chris.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * @author Chris Chan
 * Create on 2021/5/22 7:23
 * Use for:
 * Explain: Flink批处理
 */
public class WordCountBatchTest {
    public static void main(String[] args) throws Exception {
        new WordCountBatchTest().execute(args);
    }

    private void execute(String[] args) throws Exception {
        //获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //解析文本文件路径
        String filePath = getClass().getClassLoader().getResource("batch.txt").getFile();
        //读取文本文件
        DataSource<String> dataSource = env.readTextFile(filePath);
        //wordcount计算
        AggregateOperator<Tuple2<String, Integer>> operator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            /**
             * map计算
             * @param value 输入数据 用空格分隔的句子
             * @param out map计算之后的收集器
             * @throws Exception
             */
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //用空格分隔为单词
                String[] words = value.split(" ");
                //统计单词使用频次,放入收集器
                Arrays.stream(words)
                        //洗去前后空格
                        .map(String::trim)
                        //过滤掉空字符串
                        .filter(word -> !"".equals(word))
                        //加入收集器
                        .forEach(word -> out.collect(new Tuple2<>(word, 1)));
            }
        })
                //按照二元组第一个字段word分组,把第二个字段统计出来
                .groupBy(0).sum(1);
        operator.print();
    }
}
           

测试逻辑并不复杂,大致应该是执行了三个任务,完成了map和reduce两个步骤,实现了WordCount的效果。

本地运行测试。

Java整合Flink批处理本地数据

测试结果打印的是收集好的二元组信息。业务使用的时候应该是要转换为自己专用的数据类型的。