Elasticsearch是一个基于Lucene的分布式搜索引擎,具有分布式、全文检索、近实时搜索和分析、高可用、模式自由、RESTFul API等诸多优点,在实时搜索、日志处理(ELK)、大数据分析等领域有着广泛的应用。Hadoop是一个由Apache基金会所开发的分布式系统基础架构,核心组件有HDFS和MapReduce,分别提供海量数据存储和海量数据计算。
图1 ES-Hadoop简介
Elasticsearch for Apache Hadoop是一个用于Elasticsearch和Hadoop进行交互的开源独立库,简称ES-Hadoop,在Hadoop和Elasticsearch之间起到桥梁的作用,完美地把Hadoop的批处理优势和Elasticsearch强大的全文检索引擎结合起来。
ES-Hadoop开辟了更加广阔的应用空间,通过ES-Hadoop可以索引Hadoop中的数据到Elasticsearch,充分利用其查询和聚合分析功能,也可以在Kibana中做进一步的可视化分析,同时也可以把Elasticsearch中的数据放到Hadoop生态系统中做运算,ES-Hadoop支持Spark、Spark、 Streaming、SparkSQL,除此之外,不论是使用Hive、 Pig、Storm、Cascading还是运行单独的Map/Reduce,通过ES-Hadoop提供的接口都支持从Elasticsearch中进行索引和查询操作。
本文基于阿里云E-MapReduce和阿里云Elasticsearch,演示如何通过ES-Hadoop连通Hadoop生态系统和Elasticsearch。
图2 阿里云产品地图
因为在公网访问推送数据安全性较差,为保证阿里云Elasticsearch访问环境安全,购买阿里云ES产品,对应区域下必须要有 VPC 和 虚拟交换机,因此首先开通VPC专有网络。按路径:阿里云首页-->产品-->网络->专有网络VPC,然后选择立即开通,进入到管理控制台界面,新建专有网络。
图3 创建专有网络
创建完成之后在控制台中可以进行管理:
图4 专有网络管理
按路径:阿里云首页-->产品-->数据库-->Elasticsearch或阿里云首页-->产品-->大数据基础服务-->Elasticsearch进入到阿里云Elasticsearch产品界面,新用户可以免费试用30天。
进入到购买入口,阿里云Elasticsearch提供了按月和按量两种付费模式,选择已经创建的专有网络并设置登录密码。
图5 阿里云Elasticsearch选购配置
购买成功后,按路径:控制台-->大数据(数加)-->Elasticsearch,可以看到新创建的Elasticsearch集群实例。
图6 阿里云Elasticsearch实例列表页
点击"管理"菜单,进入集群管理界面:
图7 阿里云Elasticsearch集群管理
点击"Kibana控制台"按钮即可进入到Kibana操作界面:
图8 阿里云Elasticsearch集群Kibana操作管理界面
点击"集群监控"按钮进入到监控界面:
图9 阿里云Elasticsearch集群监控界面
按路径:阿里云首页-->产品-->大数据基础服务-->E-MapReduce,之后进入到购买界面:
图10 阿里云E-MapReduce软件配置
下一步进行付费配置、网络配置和节点硬件配置:
图11 阿里云E-MapReduce硬件配置
最后设置集群名称、日志路径和集群登录密码:
图12 阿里云E-MapReduce基础配置
图13 阿里云OSS创建bucket
bucket创建完成以后就可以新建目录、上传文件:
图14 阿里云OSS文件管理
最后确定,完成EMR集群的创建:
图15 阿里云E-MapReduce确认页
集群创建成功后在集群列表中查看:
图16 阿里云E-MapReduce集群列表
点击管理,可以查看master节点和data节点的详细信息:
图17 阿里云E-MapReduce集群详细信息
公网IP可以直接访问,远程登录:
使用jps命令查看后台进程:
推荐使用maven来进行项目管理,首先创建一个maven工程。操作步骤如下:
2.生成工程框架。在工程根目录处执行如下命令:
mvn 会自动生成一个空的 Sample 工程,工程名为emrtoes(和指定的artifactId一致),里面包含一个简单的 pom.xml 和 App 类(类的包路径和指定的 groupId 一致)
3.加入 Hadoop 和ES-Hadoop依赖。使用任意 IDE 打开这个工程,编辑 pom.xml 文件。在 dependencies 内添加如下内容:
4.添加打包插件。由于使用了第三方库,需要把第三方库打包到jar文件中,在pom.xml中添加maven-assembly-plugin插件的坐标:
5.编写代码。在com.aliyun.emrtoes包下和 App 类平行的位置添加新类 EmrToES.java。内容如下:
6.编译并打包。在工程的目录下,执行如下命令:
执行完毕以后,可在工程目录的 target 目录下看到一个emrtoes-1.0-SNAPSHOT-jar-with-dependencies.jar,这个就是作业 jar 包。
图18 IDEA中EMR写ES工程目录截图
把下面的数据写入到blog.json中:
上传到阿里云E-MapReduce集群,使用scp远程拷贝命令上传文件:
把blog.json上传至HDFS:
把maven工程target目录下的jar包上传至阿里云E-MapReduce集群:
运行成功的话,控制台会输出如下图所示信息:
图19 阿里云E-MapReduce运行MR作业截图
命令查询Elasticsearch中的数据:
图20 命令查看阿里云Elasticsearch中的数据
或者在Kibana中查看:
图21 Kibana中查看阿里云Elasticsearch中的数据
Map过程,按行读入,input kye的类型为Object,input value的类型为Text。输出的key为NullWritable类型,NullWritable是Writable的一个特殊类,实现方法为空实现,不从数据流中读数据,也不写入数据,只充当占位符。MapReduce中如果不需要使用键或值,就可以将键或值声明为NullWritable,这里把输出的key设置NullWritable类型。输出为BytesWritable类型,把json字符串序列化。
因为只需要写入,没有Reduce过程。配置参数说明如下:
conf.set("es.net.http.auth.user", "你的X-PACK用户名");
设置X-PACK的用户名
conf.set("es.net.http.auth.pass", "你的X-PACK密码");
设置X-PACK的密码
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
关闭mapper阶段的执行推测
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
关闭reducer阶段的执行推测
conf.set("es.nodes", "你的Elasticsearch内网地址");
配置Elasticsearch的IP和端口
conf.set("es.resource", "blog/yunqi");
设置索引到Elasticsearch的索引名和类型名。
conf.set("es.mapping.id", "id");
设置文档id,这个参数”id”是文档中的id字段
conf.set("es.input.json", "yes");
指定输入的文件类型为json。
job.setInputFormatClass(TextInputFormat.class);
设置输入流为文本类型
job.setOutputFormatClass(EsOutputFormat.class);
设置输出为EsOutputFormat类型。
job.setMapOutputKeyClass(NullWritable.class);
设置Map的输出key类型为NullWritable类型
job.setMapOutputValueClass(BytesWritable.class);
设置Map的输出value类型为BytesWritable类型
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
传入HDFS上的文件路径
首先,总结一下实践过程中遇到的问题:
1.ClassNotFoundException异常
遇到找不到EsOutputFormat所在类,导致的ClassNotFoundException异常:
解决办法,使用maven-assembly-plugin插件,把第三方库打到jar包中。
2. 连接不到Elasticsearch集群
连接不到Elasticsearch集群的第一个原因是没有配置X-PACK 的用户名和密码,加上以下两行配置:
第二个原因就是EMR集群和Elasticsearch集群网络不通,在创建集群的时尽量选择同一区域,比如EMR集群在华东1区,Elasticsearch集群也在华东1区,事先用Ping命令测试。
第三个原因是端口,一般TCP端口(比如使用Java客户端)是9300,ES-Hadoop中使用的仍然是9200端口.
3.Reduce过程中格式错误
注意测试文件中每一行都是一个JSON,在设置中加上:
否则会出现解析文件格式异常。