天天看点

Dataworks同步数据到X-pack Spark

简介

本文主要介绍如何通过“Dataworks->数据集成->离线同步”把数据同步到X-pack Spark的hdfs上。同步数据到X-pack的hdfs后,就可以使用X-pack Spark对数据进行分析。

本例通过把Dataworks的一张表同步到X-pack Spark的hadfs为例,介绍如何同步数据。

前置条件

  1. X-pack Spark集群已经开通hdfs端口。需要联系X-pack Spark维护人员:“云X-Pack Spark答疑” 开通。

操作步骤

在Dataworks中创建“独享数据集成资源组”

X-pack Spark的hdfs是在VPC内,Dataworks要求一定要使用“独享数据集成资源组”才可以同步数据。

假设创建的“独享数据集成资源组”的名称为:test_cluster,如下图:

Dataworks同步数据到X-pack Spark

注意:可用区应要选择和X-pack Spark一样的可用区。

独享数据集成资源组的创建详细指导请参考Dataworks官方文档:“

独享资源组

对“独享数据集成资源组”进行“专有网络绑定”

创建完“独享数据集成资源组”之后需要对其操作“专有网络绑定”,如下图:

Dataworks同步数据到X-pack Spark

注意:“专有网络” 一定要选择和X-pack Spark相同的专有网络。 “交换机”和“安全组”建议选择和X-pack Spark相同的。(本例选择相同的)

在“X-pack Spark”中配置Dataworks的白名单。

需要在X-pack Spark中配置Dataworks的白名单,Dataworks才能访问到X-pack Spark。

打开上一步骤中绑定的“交换机”, 查看交换机的“IPv4网段”, 把“IPv4网段”对应的IP断添加到X-pack Spark的白名单中。

Dataworks同步数据到X-pack Spark

配置Dataworks的安全组出入端口。

“独享数据集成资源组”绑定“安全组”后,需要配置安全组的出入端口,保证“独享数据集成资源组”可以访问到X-pack Spark的hdfs。 需要打开8020和50070端口。

在Dataworks中创建表。

在Dataworks创建表test01,数据如下:

CREATE TABLE IF NOT EXISTS test01
(
    id   STRING,
    name STRING
) 
insert into test01 values('a', 'b')           

在Dataworks中创建“离线同步”。

创建离线同步把数据表test01同步到X-pack Spark的hdfs中。Dataworks不支持到hdfs的向导配置,需要切换到“脚本模式”配置任务。脚本内容如下:

{
    "type": "job",
    "steps": [
        {
            "stepType": "odps",
            "parameter": {
                "partition": [],
                "datasource": "odps_first",
                "column": [
                    "*"
                ],
                "guid": null,
                "emptyAsNull": false,
                "table": "test01"
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "hdfs",
            "parameter": {
                "path": "/tmp",
                "fileName": "test01.txt",
                "compress": "GZIP",
                "defaultFS": "hdfs://${spark集群id}",
                "hadoopConfig": {
                    "dfs.ha.automatic-failover.enabled.${spark集群id}": true,
                    "dfs.namenode.http-address.${spark集群id}.nn1": "${spark集群id}-master1-001.spark.rds.aliyuncs.com:50070",
                    "dfs.namenode.http-address.${spark集群id}.nn2": "${spark集群id}-master2-001.spark.rds.aliyuncs.com:50070",
                    "dfs.client.failover.proxy.provider.${spark集群id}": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                    "dfs.nameservices": "${spark集群id}",
                    "dfs.ha.namenodes.${spark集群id}": "nn1,nn2",
                    "dfs.namenode.rpc-address.${spark集群id}.nn1": "ap-wz9t69njoc3xzt65y-master1-001.spark.rds.aliyuncs.com:8020",
                    "dfs.namenode.rpc-address.${spark集群id}.nn2": "ap-wz9t69njoc3xzt65y-master2-001.spark.rds.aliyuncs.com:8020"
                },
                "column": [
                    {
                        "name": "col1",
                        "type": "string"
                    },
                    {
                        "name": "col2",
                        "type": "string"
                    }
                ],
                "writeMode": "append",
                "encoding": "UTF-8",
                "fieldDelimiter": ",",
                "fileType": "text"
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": ""
        },
        "speed": {
            "concurrent": 2,
            "throttle": false
        }
    }
}           

脚本说明:

"datasource": "odps_first": Dataworks默认创建的数据源。

"table": "test01": Dataworks的数据表:test01

"path": "/tmp": 写入数据到hdfs的路径。

"fileName": "test01.txt":写入数据到hdfs的文件名称。

"compress": "GZIP": 写入到hdfs的文件压缩格式。

"defaultFS": "hdfs://${spark集群id}": X-pack Spark hdfs集群的defaultFS,需要把${spark集群id}替换成自己的X-pack Spark集群ID。

"hadoopConfig": {xxx}: X-pack Spark hdfs集群的HA 的配置信息,需要把内容中的${spark集群id}替换成自己的X-pack Spark集群ID。

脚本配置完后,需要“配置任务资源组”, 点击“配置任务资源组” 选择第一步创建的“独享数据集成资源组”:test_cluster。 如下图:

Dataworks同步数据到X-pack Spark

在Dataworks中运行查看效果。

配置完毕后点击“运行”,等待运行成功。然后查询X-pack Spark的hdfs中是否有文件写入,当出现如下文件时,说明写入成功:

Dataworks同步数据到X-pack Spark

如何查看X-pack Spark hdfs文件,请参考:

Spark控制台

小结

数据同步到X-pack Spark hdfs 后可以同步X-pack Spark 控制分析hdfs的数据了。

继续阅读