天天看点

JavaFlink系列之一:Maven程序搭建及Java入门案例多种写法

 一、Flink项目依赖配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.hainiu</groupId>
  <artifactId>hainiuflink</artifactId>
  <version>1.0</version>

  <properties>
    <java.version>1.8</java.version>
    <scala.version>2.11</scala.version>
    <flink.version>1.9.3</flink.version>
    <parquet.version>1.10.0</parquet.version>
    <hadoop.version>2.7.3</hadoop.version>
    <fastjson.version>1.2.72</fastjson.version>
    <redis.version>2.9.0</redis.version>
    <mysql.version>5.1.35</mysql.version>
    <log4j.version>1.2.17</log4j.version>
    <slf4j.version>1.7.7</slf4j.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.build.scope>compile</project.build.scope>
    <!--        <project.build.scope>provided</project.build.scope>-->
    <mainClass>com.hainiu.Driver</mainClass>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>${slf4j.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>

    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>${log4j.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink的hadoop兼容 -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink的hadoop兼容 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hadoop-compatibility_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink的java的api -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink streaming的java的api -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink的scala的api -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink streaming的scala的api -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink运行时的webUI -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime-web_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- 使用rocksdb保存flink的state -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink操作hbase -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-hbase_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink操作es -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch5_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink 的kafka -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink 写文件到HDFS -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-filesystem_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- mysql连接驱动 -->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>${mysql.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- redis连接 -->
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>${redis.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- flink操作parquet文件格式 -->
    <dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-avro</artifactId>
      <version>${parquet.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.parquet</groupId>
      <artifactId>parquet-hadoop</artifactId>
      <version>${parquet.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-parquet_${scala.version}</artifactId>
      <version>${flink.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <!-- json操作 -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>${fastjson.version}</version>
      <scope>${project.build.scope}</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>3.0.0</version>
    </dependency>
  </dependencies>

  <build>
    <resources>
      <resource>
        <directory>src/main/resources</directory>
      </resource>
    </resources>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <configuration>
          <descriptors>
            <descriptor>src/assembly/assembly.xml</descriptor>
          </descriptors>
          <archive>
            <manifest>
              <mainClass>${mainClass}</mainClass>
            </manifest>
          </archive>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.12</version>
        <configuration>
          <skip>true</skip>
          <forkMode>once</forkMode>
          <excludes>
            <exclude>**/**</exclude>
          </excludes>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
          <encoding>${project.build.sourceEncoding}</encoding>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>
           

二、JavaFlink案例

package com.linwj.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment scc = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> socket = scc.socketTextStream("localhost", 6666);  // nc -lk 6666

//        // 1.lambda写法
//        SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap((String a, Collector<String> out) -> {   // 传入一个收集工具 Collector<String> out,更多案例见https://vimsky.com/examples/detail/java-method-org.apache.flink.util.Collector.collect.html
//            Arrays.stream(a.split(" ")).forEach((x1) -> {
//                out.collect(x1);
//            });
//        }).returns(Types.STRING);
        t1_flatmap.print();
//
//        SingleOutputStreamOperator<Tuple3<String, Integer, String>> t2_map =
//                t1_flatmap.map((x1) -> Tuple3.of(x1, 1, "gg")).returns(Types.TUPLE(Types.STRING,Types.INT,Types.STRING));
        t2_map.print();
//
//        SingleOutputStreamOperator<Tuple3<String, Integer, String>> t3_keyBy_sum =
//                t2_map.keyBy(0).sum(1); //没有->lambda表达式无需再声明返回类型
//        t3_keyBy_sum.print();
//
//        scc.execute();
//
//        // 2.function写法
//        /*
//        1)匿名内部类的格式: new 父类名&接口名(){ 定义子类成员或者覆盖父类方法 }.方法。而内部类是有自己定的类名的。
//        2) FlatMapFunction<String,String>如果不写泛型会编译报错:Class 'Anonymous class derived from FlatMapFunction' must either be declared abstract or implement abstract method 'flatMap(T, Collector<O>)' in 'FlatMapFunction'
//        保持<String,String>对应实现接口或类的泛型
//         */
//        SingleOutputStreamOperator<String> t1_flatmap = socket.flatMap(new FlatMapFunction<String,String>() {
//            @Override
//            public void flatMap(String value, Collector<String> out) throws Exception {
//                String[] s = value.split(" ");
//                for (String ss : s) {
//                    out.collect(ss);
//                }
//            }
//        });
        t1_flatmap.print();
//
//        SingleOutputStreamOperator<Tuple2<String, Integer>> t2_map = t1_flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
//            @Override
//            public Tuple2<String, Integer> map(String s) throws Exception {
//                return Tuple2.of(s, 1);
//            }
//        });
        t2_map.print();
//
//        SingleOutputStreamOperator<Tuple2<String, Integer>> t3_keyBy_sum = t2_map.keyBy(0).sum(1);
//        t3_keyBy_sum.print();

//        // 3.function组合写法
//        SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
//
//            @Override
//            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                String[] s = value.split(" ");
//                for (String ss : s) {
//                    out.collect(Tuple2.of(ss, 1));
//                }
//            }
//        });
//
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
//        sum.print();

//        // 4.richfunction组合写法
//        SingleOutputStreamOperator<Tuple2<String, Integer>> flatmap = socket.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {
//            private String name = null;
//
//            @Override
//            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                String[] s = value.split(" ");
//                for (String ss : s) {
//                    System.out.println(getRuntimeContext().getIndexOfThisSubtask()); //RichMapFunction富函数,额外提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。
//                    out.collect(Tuple2.of(name + ss, 1));
//                }
//
//            }
//
//            @Override
//            public void open(Configuration parameters) {
//                name = "linwj_";
//            }
//
//            @Override
//            public void close() {
//                name = null;
//            }
//
//        });
//        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = flatmap.keyBy(0).sum(1);
//        sum.print();

        //5.processfunction组合写法
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socket.process(new ProcessFunction<String, Tuple2<String, Integer>>() {

            private String name = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                name = "linwj";
            }

            @Override
            public void close() throws Exception {
                name = null;
            }

            @Override
            public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
//                getRuntimeContext()
                String[] s = value.split(" ");
                for (String ss : s) {
                    System.out.println(getRuntimeContext().getIndexOfThisSubtask());
                    out.collect(Tuple2.of(name + ss, 1));
                }
            }
        }).keyBy(0).process(new KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            private Integer num = 0;

            @Override
            public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
                num += value.f1;
                out.collect(Tuple2.of(value.f0,num));
            }
        });
        sum.print();

        scc.execute();



    }
}