文章目录
- 前言
- HDFS简介
- HDFS架构
- Java接口下的程序
- 准备
- 从Hadoop URL读取数据
- 通过FileSystem API读取数据
- 通过seek()方法,显示数据两次
- 将本地文件复制到Hadoop文件系统
- 展示文件状态信息
- 显示Hadoop文件系统中一组路径的文件信息
前言
以下示例均来自《Hadoop 权威指南》
HDFS简介
HDFS(Hadoop Distributed File System),分布式文件系统
HDFS,是Hadoop抽象文件系统的一种实现。Hadoop抽象文件系统可以与本地系统、Amazon S3等集成,甚至可以通过Web协议(webhsfs)来操作。HDFS的文件分布在集群机器上,同时提供副本进行容错及可靠性保证。例如客户端写入读取文件的直接操作都是分布在集群各个机器上的,没有单点性能压力。
HDFS架构
HDFS采用了主从(Master/Slave)结构模型,一个HDFS集群是由一个NameNode和若干个DataNode组成的。其中NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作;集群中的DataNode管理存储的数据。
Java接口下的程序
准备
这里提前在Hadoop的文件系统中的hdfs://localhost:9000/input/docs目录中新建了一个people.txt文件,并在该文件中添加了“123456”的字样,只要没有特殊说明,下面程序的输入(即args[0])均为该文件。若没有添加该文件,可前往 http://localhost:9870/ 的GUI界面进行添加,如图所示:
这样,就可以开始代码的编写了注意:以下的代码是作者使用的Windows系统下的IDEA工具进行编写和运行,其中的args[0]、args[1]选项本来是用命令
java xxx args[0] args[1]
来进行运行。但是在IDEA中,可以在运行配置中手动设置这个值,设置步骤如图所示
此外,在运行下面的代码前,记得要在hadoop/sbin目录下用start-all.cmd启动所有服务,并且保证输入jps时,能看到如下的界面,才能说明服务开启完毕
最后,别忘记去Maven导Hadoop的相关jar包,这里贴出必要jar包的代码,方便读者直接添加依赖
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.3.0</version>
</dependency>
从Hadoop URL读取数据
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.net.URL;
public class URLCat {
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) {
InputStream in = null;
try {
in = new URL(args[0]).openStream();
IOUtils.copyBytes(in,System.out,4096,false);
} catch (IOException e) {
e.printStackTrace();
}finally {
IOUtils.closeStream(in);
}
}
}
结果如图所示(为了更好地显示结果,关闭了日志功能):
通过FileSystem API读取数据
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
public class FileSystemCat {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
InputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in,System.out,4096,false);
} catch (IOException e) {
e.printStackTrace();
}finally {
IOUtils.closeStream(in);
}
}
}
结果如图所示:
通过seek()方法,显示数据两次
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.net.URI;
public class FileSystemDoubleCat {
public static void main(String[] args) throws IOException {
String url = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url),conf);
FSDataInputStream in =null;
try {
in = fs.open(new Path(url));
IOUtils.copyBytes(in,System.out,4096,false);
in.seek(0);
IOUtils.copyBytes(in,System.out,4096,false);
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(in);
}
}
}
结果如图所示:
将本地文件复制到Hadoop文件系统
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import java.io.*;
import java.net.URI;
public class FileCopyWithProgress {
public static void main(String[] args) throws IOException {
String localSrc = args[0];
String dst = args[1]; // args[1]为输出,即目标文件
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst),conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
@Override
public void progress() {
System.out.println(".");
}
});
IOUtils.copyBytes(in,out,4096,true);
}
}
运行结果如图(这里将源文件设置为跟people.txt一样的文件,目标文件则为people2.txt):
展示文件状态信息
注意:在运行该段代码前,由于该代码没有main()函数,需要从maven中导入junit包才能进行测试,代码如下:
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
导入包后,该段代码带有@Test注解的方法的左侧均会出现一个带有绿色三角形的图标,点击该图片即可单独运行该方法
package URLReadTwo;
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.*;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import java.net.URI;
import java.io.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.MiniDFSCluster;
public class ShowFileStatusTest {
private MiniDFSCluster cluster;
private FileSystem fs;
@Before
public void setUp() throws IOException {
Configuration conf = new Configuration();
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
}
cluster = new MiniDFSCluster(conf, 1, true, null);
fs = cluster.getFileSystem();
OutputStream out = fs.create(new Path("/dir/file"));
out.write("content".getBytes("UTF-8"));
out.close();
}
@After
public void tearDown() throws IOException {
if( fs != null) {
fs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
//有几个@Test就会做几个测试,应该是标记其后的代码是要执行测试的
@Test(expected = FileNotFoundException.class)
public void ThrowsFileNotFoundForNonExistentFile() throws IOException {
fs.getFileStatus(new Path("no-such-file"));
}
@Test
public void fileStatusForFile() throws IOException {
Path file = new Path("/dir/file");
FileStatus stat = fs.getFileStatus(file);
assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
assertThat(stat.isDir(), is(false));
assertThat(stat.getLen(), is(7L));
//assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short)1));
assertThat(stat.getBlockSize(), is(134217728L));
assertThat(stat.getOwner(), is("lishengda"));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
}
@Test
public void fileStatusForDirectory() throws IOException {
Path dir = new Path("/dir");
FileStatus stat = fs.getFileStatus(dir);
assertThat(stat.getPath().toUri().getPath(), is("/dir"));
assertThat(stat.isDir(), is(true));
assertThat(stat.getLen(), is(0L));
// assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));
assertThat(stat.getReplication(), is((short) 0));
assertThat(stat.getBlockSize(), is(0L));
assertThat(stat.getOwner(), is("lishengda"));
assertThat(stat.getGroup(), is("supergroup"));
assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
}
}
显示Hadoop文件系统中一组路径的文件信息
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
public class ListStatus {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path listedPath : listedPaths) {
System.out.println(listedPath);
}
}
}