天天看點

Flink 本地環境搭建

本地環境

Java > 1.8、mac os

參考

https://www.cnblogs.com/ldsweely/p/11980890.html

啟動

bin/start-cluster.sh 

通路界面

http://127.0.0.1:8081/#/overview

Flink 本地環境搭建

示例:詞語統計

package com.jihitee.myflink.flinklearn;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.readTextFile("/path/flinklearn/in.txt");

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        String outputPath = "/path/out.txt";
        counts.writeAsCsv(outputPath, "\n", " ");
        env.execute("myflink");
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.split(" ");
            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}      

運作

./bin/flink run -c com.jihitee.myflink.flinklearn.SocketWindowWordCountJava  /youpath/flinklearn-1.0-SNAPSHOT.jar

問題

https://blog.csdn.net/qq_34321590/article/details/106991437

Flink本地模式報錯:Exception in thread main java.lang.NoClassDefFoundError

解決:

修改pom.xml中的依賴項,屏蔽掉 <scope>provided</scope>

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
<!--        <scope>provided</scope>-->
        </dependency>