天天看点

4.Hadoop分布式文件系统——HDFS

文章目录

  • ​​前言​​
  • ​​HDFS简介​​
  • ​​HDFS架构​​
  • ​​Java接口下的程序​​
  • ​​准备​​
  • ​​从Hadoop URL读取数据​​
  • ​​通过FileSystem API读取数据​​
  • ​​通过seek()方法,显示数据两次​​
  • ​​将本地文件复制到Hadoop文件系统​​
  • ​​展示文件状态信息​​
  • ​​显示Hadoop文件系统中一组路径的文件信息​​

前言

以下示例均来自《Hadoop 权威指南》

HDFS简介

HDFS(Hadoop Distributed File System),分布式文件系统

HDFS,是Hadoop抽象文件系统的一种实现。Hadoop抽象文件系统可以与本地系统、Amazon S3等集成,甚至可以通过Web协议(webhsfs)来操作。HDFS的文件分布在集群机器上,同时提供副本进行容错及可靠性保证。例如客户端写入读取文件的直接操作都是分布在集群各个机器上的,没有单点性能压力。

HDFS架构

HDFS采用了主从(Master/Slave)结构模型,一个HDFS集群是由一个NameNode和若干个DataNode组成的。其中NameNode作为主服务器,管理文件系统的命名空间和客户端对文件的访问操作;集群中的DataNode管理存储的数据。

4.Hadoop分布式文件系统——HDFS
4.Hadoop分布式文件系统——HDFS

Java接口下的程序

准备

这里提前在Hadoop的文件系统中的hdfs://localhost:9000/input/docs目录中新建了一个people.txt文件,并在该文件中添加了“123456”的字样,只要没有特殊说明,下面程序的输入(即args[0])均为该文件。若没有添加该文件,可前往 ​​http://localhost:9870/​​ 的GUI界面进行添加,如图所示:

4.Hadoop分布式文件系统——HDFS

这样,就可以开始代码的编写了注意:以下的代码是作者使用的Windows系统下的IDEA工具进行编写和运行,其中的args[0]、args[1]选项本来是用命令​

​java xxx args[0] args[1]​

​来进行运行。但是在IDEA中,可以在运行配置中手动设置这个值,设置步骤如图所示

4.Hadoop分布式文件系统——HDFS
4.Hadoop分布式文件系统——HDFS

此外,在运行下面的代码前,记得要在hadoop/sbin目录下用start-all.cmd启动所有服务,并且保证输入jps时,能看到如下的界面,才能说明服务开启完毕

4.Hadoop分布式文件系统——HDFS

最后,别忘记去Maven导Hadoop的相关jar包,这里贴出必要jar包的代码,方便读者直接添加依赖

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.3.0</version>
        </dependency>      

从Hadoop URL读取数据

import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

import java.net.URL;

public class URLCat {

    static{
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    public static void main(String[] args) {
        InputStream in = null;
        try {
            in = new URL(args[0]).openStream();
            IOUtils.copyBytes(in,System.out,4096,false);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            IOUtils.closeStream(in);
        }
    }
}      

结果如图所示(为了更好地显示结果,关闭了日志功能):

4.Hadoop分布式文件系统——HDFS

通过FileSystem API读取数据

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


import java.io.IOException;
import java.io.InputStream;
import java.net.URI;


public class FileSystemCat {
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri),conf);
        InputStream in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in,System.out,4096,false);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            IOUtils.closeStream(in);
        }
    }
}      

结果如图所示:

4.Hadoop分布式文件系统——HDFS

通过seek()方法,显示数据两次

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.IOException;
import java.net.URI;

public class FileSystemDoubleCat {

    public static void main(String[] args) throws IOException {
        String url = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(url),conf);
        FSDataInputStream in =null;
        try {
            in = fs.open(new Path(url));
            IOUtils.copyBytes(in,System.out,4096,false);
            in.seek(0);
            IOUtils.copyBytes(in,System.out,4096,false);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            IOUtils.closeStream(in);
        }

    }
}      

结果如图所示:

4.Hadoop分布式文件系统——HDFS

将本地文件复制到Hadoop文件系统

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;


import java.io.*;
import java.net.URI;

public class FileCopyWithProgress {

    public static void main(String[] args) throws IOException {
        String localSrc = args[0];
        String dst = args[1]; // args[1]为输出,即目标文件
        InputStream in = new BufferedInputStream(new FileInputStream(localSrc));

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(dst),conf);
        OutputStream out = fs.create(new Path(dst), new Progressable() {
            @Override
            public void progress() {
                System.out.println(".");
            }
        });

        IOUtils.copyBytes(in,out,4096,true);
    }
}      

运行结果如图(这里将源文件设置为跟people.txt一样的文件,目标文件则为people2.txt):

4.Hadoop分布式文件系统——HDFS

展示文件状态信息

注意:在运行该段代码前,由于该代码没有main()函数,需要从maven中导入junit包才能进行测试,代码如下:

<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
</dependency>      

导入包后,该段代码带有@Test注解的方法的左侧均会出现一个带有绿色三角形的图标,点击该图片即可单独运行该方法

package URLReadTwo;

import static org.junit.Assert.*;

import static org.hamcrest.CoreMatchers.*; 

import org.junit.Before;

import org.junit.After;

import org.junit.Test;

import java.net.URI;

import java.io.*;



import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.hdfs.MiniDFSCluster;



public class ShowFileStatusTest {

private MiniDFSCluster cluster;

private FileSystem fs;


@Before

public void setUp() throws IOException {

Configuration conf = new Configuration();

if (System.getProperty("test.build.data") == null) {

System.setProperty("test.build.data", "/tmp");

}

cluster = new MiniDFSCluster(conf, 1, true, null);

fs = cluster.getFileSystem();

OutputStream out = fs.create(new Path("/dir/file"));

out.write("content".getBytes("UTF-8"));

out.close();

}



@After

public void tearDown() throws IOException {

if( fs != null) {

fs.close();

}

if (cluster != null) {

cluster.shutdown();

}

}


//有几个@Test就会做几个测试,应该是标记其后的代码是要执行测试的

@Test(expected = FileNotFoundException.class)

public void ThrowsFileNotFoundForNonExistentFile() throws IOException {

fs.getFileStatus(new Path("no-such-file"));

}



@Test

public void fileStatusForFile() throws IOException {

Path file = new Path("/dir/file");

FileStatus stat = fs.getFileStatus(file);

assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));

assertThat(stat.isDir(), is(false));

assertThat(stat.getLen(), is(7L));

//assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));

assertThat(stat.getReplication(), is((short)1));

assertThat(stat.getBlockSize(), is(134217728L));

assertThat(stat.getOwner(), is("lishengda"));

assertThat(stat.getGroup(), is("supergroup"));

assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));

}



@Test

public void fileStatusForDirectory() throws IOException {

Path dir = new Path("/dir");

FileStatus stat = fs.getFileStatus(dir);

assertThat(stat.getPath().toUri().getPath(), is("/dir"));

assertThat(stat.isDir(), is(true));

assertThat(stat.getLen(), is(0L));

// assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis())));

assertThat(stat.getReplication(), is((short) 0));

assertThat(stat.getBlockSize(), is(0L));

assertThat(stat.getOwner(), is("lishengda"));

assertThat(stat.getGroup(), is("supergroup"));

assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));

}

}      

显示Hadoop文件系统中一组路径的文件信息

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.net.URI;


public class ListStatus {
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri),conf);

        Path[] paths = new Path[args.length];
        for (int i = 0; i < paths.length; i++) {
            paths[i] = new Path(args[i]);
        }
        FileStatus[] status = fs.listStatus(paths);
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path listedPath : listedPaths) {
            System.out.println(listedPath);
        }
    }
}