maxcompute graph 是基于飞天平台实现的面向迭代的图处理框架,为用户提供了类似于 pregel 的编程接口。maxcompute graph(以下简称 graph )作业包含图加载和计算两个阶段:
加载,将存储在表中的数据载入到内存中,以点和边的形式存在;
图的原始数据存在于maxcompute 的表(table)中,每个 table 包含多个记录(record),每个 record 又包含多个列(field),图加载就是将这种形式的数据,转换成 vertex 和 edge的过程。
我们将图的加载过程实现为一个网络版的mapreduce过程,如下图所示:
下面我们分三个阶段,分别解析图载入的具体实现。
图作业将maxcompute 中的table作为输入,需要做的第一步就是将输入表中的record分块,以便并行载入。输入的分解与maxcompute mr相同,都是经过一下步骤:
获取图作业的所有输入表或者分区;
拿到输入表或分区下的所有数据文件;
按照指定字节数(块大小)将大文件进行切割,或者小文件合并;
切割后的块或者多个小文件组成的块,就是最终每个worker的输入(split);
额外解释一下第三步的切割(或合并),我们知道表或者分区的数据都是存储在一个或多个文件中的,而文件有大有小。在根据指定的字节数 split size划分块时,我们将大于split size的文件称为大文件,小于split size的小文件。
因此,为了保证每个块字节数基本相等,需要将大文件按照split size切分成小块,具体形式就是有三元组(文件名,起始偏移字节,块长度)表示,文件尾剩余的字节数如果过小,会合并到最后一块中。而对于小文件们,则需要将多个小文件作为一个块,具体形式有多个三元组组成【(文件名,0,文件长度),(文件名,0,文件长度)】,最终块的大小也会受块中小文件的个数限制(保证不会因为过多的小文件导致载入时间过长)。
至此,我们已经将输入划分成了块,假设块的个数为m,根据不同的作业配置,数据并行载入的方式也不相同。
对于此种情况,worker和split是一对一的关系,worker数由split的个数决定,如果split个数超过1000,则作业会因为worker数超过1000报错。
graph作业同时支持用户手工设置worker数,这时候又会出现一个worker对应零个或多个split的情况。
如果split size比较小,导致split的个数多于设定的worker数,则worker与split的对应关系如下图:
如果split size比较大,导致split的个数少于设定的worker数,则worker与split的对应关系如下图:
因此,用户手工设定 worker 数的情况下,调整split size的大小,有助于提高数据的载入速度。
总结一下,本小节将输入分解成了块,并确定了worker和块之间的关系。不管哪种情况,最终的结果是,一个 worker 会分到零个或多个 split,下面就需要解析一个 worker 是怎么处理 split 的。
如上节所述,本节解释单个 worker 处理零个或多个 split 的执行过程。此阶段的实现需要用户实现 graphloader 接口,我们会为每个 split 创建单独的 graphloader 实例,这个实例负责将 split 中的 record 解释为 vertex 或者 edge。根据此 worker 分到的 split 的个数,分为三种情况:
worker 分到零个 split。此种情况下该 worker 无需进入此阶段,直接进入下一阶段。
从整体看,graphloader 实例会读取 split 中的每个 record,用户可根据此 record 中的语义(如来自哪张表,字段的含义等)解析成 vertex 和(或)edge,解析后的结果以键值对的形式存在,(id, vertex)表示为值为 id 的点增加一个 vertex 对象,(id, edge)表示为值为id的点增加一个 edge 对象。请注意,vertex 本身也是可以带有 edge 的。
此阶段,可以为单个 id 多次增加 vertex 或 edge 对象,无须关系是否重复,下个阶段会处理这种情况。此过程将 split 中的多个 record,转换成了多个 vertex 或 edge对象。用户可以将其理解成一个 map 过程,将 record 转换成一些(key, value),key本身会重复。
worker分到多个 split。第二种情况是此第三种情况的特例,表现为一个 worker 多次处理 split 的情况,每次与第二种情况相同。注意,每个 split 有单独的 graphloader 实例。
至此,我们已经将 split 中的 record,都转换成了(id, vertex)和(id, edge)这种形式的键值对,这些键值对是通过用户的 graphloader 接口生成的。我们在拿到这些键值对之后,会根据 id 将这些键值对分发到其它的 worker,默认的分发依据是 id 的hash值。同时,每个 worker 在接收到这些键值对后,会根据 id 分组,将相同 id的 vertex 和 edge 组织起来,具体组织形式见下图:
也就是组织成 key -> values 的形式,类似于 reduce 的输入形式,但不同的是,values 会根据 value 的类型进行划分,这里会将一个 id 对应的 values 划分为 vertex 和 edge 集合。
总结一下,本小节将所有 split 解析成了(id, vertex)和(id, edge),并将这些键值对 shuffle 到了指定的 worker,下面就需要解析 worker 是怎么处理这些键值对的。
graph 中图是以邻接表的形式存储的,因此,内存中的对象组织形式就是,每个点对应一个 vertex 对象,而边作为 vertex 对象的成员变量存在,表示此 vertex 的出边。
因此,我们此阶段要做的工作,就是将从上一节得到的键值对,封装到一个 vertex 对象中,并把此 vertex 对象添加到最终的图中,此 vertex 对象也是计算阶段迭代遍历的vertex对象。
我们用下图表示此阶段的执行过程:
此过程,针对每个 id 汇总后的结果,将上一节中针对此 id 的添加的所有 vertex 和 edge 都封装到一个 vertex 对象中。封装的过程是通过 vertexresolver 接口完成的,此接口我们会提供一个默认实现,用户也可以自定义,根据自己作业的情况进行特殊处理。
针对一个 id 添加的 vertex 和 edge,我们提供的默认实现有如下步骤:
检查是否添加了 vertex,没有添加 vertex 或者添加了多个 vertex时报错;
检查 vertex 中的 edge是否有重复,如果存在重复则报错;
如果添加了 edge,检查 vertex 中的 edge 和添加的 edge 是否有重复,如果存在重复则报错;
总结一下,本小节我们将 shuffle 之后的(id, vertex)和(id, edge)键值对,成功的封装成了最终的 vertex 对象,这些 vertex 对象最终也将参与后续的迭代计算。请注意,对于每个 id,都会经过这个阶段,并不是只有冲突的时候才会这样。
经过上一章节的介绍,我们已经对整个图数据加载的原理有了初步的认识,下面我们开始介绍主要的接口,从上一章节得知,用户只需要实现 graphloader 和 vertexresolver 接口,而 vertexresolver 接口我们提供了默认实现,因此一般情况下,用户只需要自定义 graphloader 接口就可以了。
先看一下 graphloader 接口的代码:
首先解释一下四个泛型参数的含义:
vertex_id 代表点的 id 的数据类型;
vertex_value 代表点的值的数据类型;
edge_value 代表边的值的数据类型;
message 代表迭代时消息的数据类型; 这些类型都是用户可以自定义的,但是都必须都实现 writable 接口。
我们现在来看 graphloader 的两个方法的含义:
setup是 graphloader 的配置方法,该方法在每个 graphloader 实例只被调用一次,我们会将作业的配置信息,当前 worker 的id,当前输入的 table 或 partiton 信息作为参数传递给 setup,用户可以使用这些参数初始化上下文信息。其中,在 main 方法中配置的信息可以在此处通过 configuration 拿到,当前输入的表名或分区名可以通过 tableinfo 拿到。
load 针对 split 中的每个 record 调用一次,用户可以读取 record 中各列的数据,转换成 vertex 和(或)edge,并通过 mutationcontext 接口请求添加到图中。mutationcontext接口的方法:
其中 getconfiguration 方法的返回值与 setup 的 configuration 参数相同,genumworkers获取作业总的 worker 的个数,getcounter 是允许用户自定义 counter 来做一些统计,在作业结束时,这些 counter 可以通过 sdk 被获取到。最重要的是剩余的四个方法,其中 removevertexrequest 和
removeedgerequest 方法可以暂时忽略,它们的作用是可以根据输入数据,可以选择去删除一些点和边,在数据载入阶段,这种需求并不常见。
我们重点关注addvertexrequest和addedgerequest,这两个方法的作用,就是生成我们之前提到的(id, vertex)和(id, edge)。addvertexrequest(vertex vertex)会生成(id, vertex)的键值对,id 是 vertex 的 id。addedgerequest(i sourcevertexid, edge edge) 会生成(id, edge)的键值对,id是边的起始点,edge包含边的终点和值。,>
总结一下,实现 graphloader 的目的,就是根据上下文,在 load 方法中,解释 record,并调用 mutationcontext 接口,请求将 vertex 和 edge 添加到图中。
请注意,这里的方法名字都带有request字样,意思就是说调用这些方法,只代表一种请求,最终这些请求是否生效,取决于 vertexresolver 的实现。
在所有的 split 被 graphloader 处理结束后,我们会做一次同步,使得所有 worker 统一开始执行 vertexresolver,我们先看一下 vertexresolver的方法:
每个 worker 上只会创建一个 vertexresolver的实例,这个实例负责处理属于此 worker 的所有点,即将关于一个 id 的 vertex 和 edge 封装成一个 vertex 对象。
configure 方法 vertexresolver 的配置方法,每个实例只会被调用一次,用于获取作业配置信息;
resolve 方法针对每个 id 调用一次,用于处理所有关于此 id 的 vertex 和 edge,它的返回值是一个vertex 对象,表示将此 vertex 添加到最终的图中。参数比较复杂,我们展开介绍:
vertexid 是当前要处理的 vertex 的 id,也就是之前所说的 key;
vertex 是当前已存在的 vertex 对象,在数据载入阶段,这个参数一定为 null;
vertexchanges 是关于此 id 所有的变动集合,主要是 graphloader 中添加的 vertex 和 edge,也就是之前所说的 values;
hasmessages 表示此 id 是否有收到message,在数据载入阶段,这个参数一定为false; 因此,这四个参数中,只需要关心 vertexid 和 vertexchanges 两个参数,而 vertexchanges就是个集合,它来自于 graphloader.load方法中,调用 mutationcontext.add*request接口添加的 vertex 和 edge,vertexchanges 内部也按照 vertex 和 edge 划分了不同的子集合,我们先看一下 vertexchanges 接口的方法:
看这个四个方法,是不是和 mutationcontext 中的方法相似?是的, mutationcontext中的add/remove*reuqest方法分别与这里的get*list对应,也解释了上一小节所说的。mutationcontext 的调用都是些请求,并不一定是最终的 vertex 和 edge,因为它们要在这里进行经过处理才能决定是否真正要添加到最终的图中。
总结一下,vertexresolver 是数据加载的最后一步,它的作用就是构造最终的图结构,构造过程就是将 graphloader 里生成的键值对进行封装,使得最终的图结构以一种邻接表的形式存在。
我们举两种类型的示例说明图的加载过程。输入数据一般可以划分成边类型和点类型的数据,我们分开举例说明。
边类型的数据可以用下面的表格表示:
sourcevertexid
destinationvertexid
edgevalue
id0
id1
9
id2
5
4
每条record表示图中的一条边的格式,上图表示id0有两条出边,分别指向id1和id2;id2有一条出边,指向id1;id1没有出边。
对应的 graphloader 的实现为:
对应的 vertexresolver的实现为:
我们分析一个多路输入的例子。graph作业指定两张表作为输入,一张是边类型的数据,格式如1中所示,另一张是点类型的数据,格式可以用下面的表格表示:
vertexid
vertexvalue
7
8
表示有三个点,id分别是id0,id1,id2,对应的点的值分别是9,7,8。
对应的 vertexresolver 的实现为: