天天看点

Flink源码解析——数据源读入原理

Flink是分布式并行计算框架,所以Flink程序内在是分布和并行的,其并行的特性可在下述代码片段体现:

val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(inputPath)
    val data = text.flatMap(_.split(" "))
    var count1 = 0
    val counter1 = data.map{
      t =>
        count1 += 1
        (t, count1)
    }
    counter1.print()
           

其中

inputPath

文件中存储的内容为以下格式:

a
b c
d
           

该段代码输出为:

(b,1)
(c,2)
(a,1)
(d,1)
           

由以上简单示例可知,Flink程序在读入文本时是并行读入的,提交Flink Job后,每一行数据为DataSet中的一个数据单元,由某一个TaskManager中的某一个slot进行计算,因此常规的累加操作是针对一个slot中需要处理的数据,无法对整体的数据进行累加操作。但是在编程过程中,发现一件很奇怪的事:如果不使用

env.readTextFile

读取数据,而使用

env.fromElements

读取数据,程序可以正常进行计数,其输出结果为:

(a,1)
(b,2)
(c,3)
(d,4)
           

于是笔者查看了Flink的源码,发现

fromElements

的具体实现是这样的:

def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T] = {
    require(data != null, "Data must not be null.")
    val typeInfo = implicitly[TypeInformation[T]]
    fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
  }
           

它调用了

fromCollection

创建的dataSource,而

fromCollection

的具体实现是这样的:

def fromCollection[T: ClassTag : TypeInformation](
      data: Iterable[T]): DataSet[T] = {
    require(data != null, "Data must not be null.")

    val typeInfo = implicitly[TypeInformation[T]]
    CollectionInputFormat.checkCollection(data.asJavaCollection, typeInfo.getTypeClass)
    val dataSource = new DataSource[T](
      javaEnv,
      new CollectionInputFormat[T](data.asJavaCollection, typeInfo.createSerializer(getConfig)),
      typeInfo,
      getCallLocationName())
    wrap(dataSource)
  }
           

readTextFile

的具体实现是这样的:

def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String] = {
    require(filePath != null, "The file path may not be null.")
    val format = new TextInputFormat(new Path(filePath))
    format.setCharsetName(charsetName)
    val source = new DataSource[String](javaEnv, format, BasicTypeInfo.STRING_TYPE_INFO,
      getCallLocationName())
    wrap(source)
  }
           

比较

fromCollection

方法和

readTextFile

方法的具体实现,可以看出其大致过程其实基本一致,无非就是

new

一个

DataSource

然后返回,但是可以看出其构造

DataSource

的参数类型有些不同,具体哪个参数类型有问题,我们可以继续观察

DataSource

类的构造函数,如下:

public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
		super(context, type);
		
		this.dataSourceLocationName = dataSourceLocationName;
		
		if (inputFormat == null) {
			throw new IllegalArgumentException("The input format may not be null.");
		}
		
		this.inputFormat = inputFormat;
		
		if (inputFormat instanceof NonParallelInput) {
			this.parallelism = 1;
		}
	}
           

OK,Flink源码跟踪到这基本要水落石出了,我们可以看出,构造函数中写了一个

if

判断,如果

inputFormat

NonParallelInput

接口的一个实例,则读取数据的过程并行度设置为1。

fromElements

方法中的输入类型参数为

CollectionInputFormat

,查看该类实现了哪些接口,如下:

public class CollectionInputFormat<T> extends GenericInputFormat<T> implements NonParallelInput {}
           

由此可见,

fromElements

方法之所以能够对整体进行计数,是由于其底层实现将该过程的并行度设置为1。

综上,我们如果需要使用

readTextFile

方法对数据进行有序读取、计数,则可以根据Flink源码中

fromElements

方法的实现思路,将读取数据操作的并行度设置为1。当数据量庞大时,这样的做法会可能会导致计算从数据源处开始瘫痪,因此最好不要采用该种方法,代码测试可以考虑采用该种方法。

那么还有什么方法可以在并行环境下对整体数据进行计数呢?可以参照很多种语言中都有的Static静态变量的思路,静态变量可以在它的作用域内,被所有类实例共享。因此可以考虑将用于计数的

count

变量设置为被整个Flink程序共享的一个变量,保证在任意TaskManager的任意的Slot中都是对同一个

count

变量进行更新。一开始考虑使用广播变量将用于计数的

count

变量广播到每一个并行度中,但广播变量必须是

DataSet[T]

类型的算子,并且每一个Slot只能对广播变量进行访问,暂没有找到可以修改广播变量的方法,因此这个处理的想法夭折了。目前可考虑的方法只有设置并行度或者通过文本预处理达到计数目的,若有新的想法会在博客更新,也欢迎讨论。

继续阅读