Flink是一个流式数据处理框架。我们与Java进行整合的第一个例程基于简单的原则,从本地一次性获取需要的数据,一次性处理完成。这个例程在本地运行,不需要服务器的支撑。
创建一个maven工程。
一、pom依赖。
<properties>
<flink.version>1.13.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
这三个依赖都是必须的,版本要一致。如果准备好将来要放远程服务集群去执行,也要和服务器的版本保持一致。
二、测试类。
package com.chris.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* @author Chris Chan
* Create on 2021/5/22 7:23
* Use for:
* Explain: Flink批处理
*/
public class WordCountBatchTest {
public static void main(String[] args) throws Exception {
new WordCountBatchTest().execute(args);
}
private void execute(String[] args) throws Exception {
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//解析文本文件路径
String filePath = getClass().getClassLoader().getResource("batch.txt").getFile();
//读取文本文件
DataSource<String> dataSource = env.readTextFile(filePath);
//wordcount计算
AggregateOperator<Tuple2<String, Integer>> operator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
/**
* map计算
* @param value 输入数据 用空格分隔的句子
* @param out map计算之后的收集器
* @throws Exception
*/
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//用空格分隔为单词
String[] words = value.split(" ");
//统计单词使用频次,放入收集器
Arrays.stream(words)
//洗去前后空格
.map(String::trim)
//过滤掉空字符串
.filter(word -> !"".equals(word))
//加入收集器
.forEach(word -> out.collect(new Tuple2<>(word, 1)));
}
})
//按照二元组第一个字段word分组,把第二个字段统计出来
.groupBy(0).sum(1);
operator.print();
}
}
测试逻辑并不复杂,大致应该是执行了三个任务,完成了map和reduce两个步骤,实现了WordCount的效果。
本地运行测试。
测试结果打印的是收集好的二元组信息。业务使用的时候应该是要转换为自己专用的数据类型的。