HDFS 的 API 操作
使用url方式通路資料(了解)
@Test
public void urlHdfs() throws IOException {
//1.注冊url
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
//2.擷取hdfs檔案的輸入流
InputStream inputStream=new URL("hdfs://hadoop101:8020/a.txt").openStream();
//3.擷取本地檔案的輸出流
OutputStream outputStream= new FileOutputStream(new File("E:\\hello.txt"));
//4.實作檔案的拷貝
IOUtils.copy(inputStream,outputStream);
//5.關流
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
}
使用檔案系統方式通路資料(掌握)
擷取 FileSystem 的幾種方式
①
/*
擷取FileSystem,方式1
*/
@Test
public void getFileSystem1() throws IOException{
//1.建立Configuration對象
Configuration configuration=new Configuration();
//2.設定檔案系統類型
configuration.set("fs.defaultFS","hdfs://hadoop101:8020");
//3.擷取指定的檔案系統
FileSystem fileSystem= FileSystem.get(configuration);
//4.輸出
System.out.println(fileSystem);
}
②用的次數比較多
/*
擷取FileSystem,方式2
ctrl+alt+v自動補全傳回值
*/
@Test
public void getFileSystem2() throws URISyntaxException, IOException, InterruptedException {
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration(),"root");
System.out.println(fileSystem);
}
③
/*
擷取FileSystem,方式3
ctrl+alt+v自動補全傳回值
*/
@Test
public void getFileSystem3() throws IOException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://hadoop101:8020","root");
//3.擷取指定的檔案系統
FileSystem fileSystem= FileSystem.newInstance(configuration);
//4.輸出
System.out.println(fileSystem.toString());
}
④
/*
擷取FileSystem,方式4
ctrl+alt+v自動補全傳回值
*/
@Test
public void getFileSystem4() throws URISyntaxException, IOException, InterruptedException {
FileSystem fileSystem= FileSystem.newInstance(new URI("hdfs://hadoop101:8020"), new Configuration(),"root");
System.out.println(fileSystem);
}
周遊 HDFS中所有檔案
/*
hdfs檔案的周遊
*/
@Test
public void listFiles() throws URISyntaxException, IOException, InterruptedException {
//1.擷取FileSystem執行個體
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration(),"root");
//2.調用方法listFiles擷取/目錄下的所有檔案資訊
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path("/"), true);
//3.周遊疊代器
while (iterator.hasNext()){
LocatedFileStatus fileStatus = iterator.next();
//擷取檔案的絕對路徑:hdfs://hadoop101:/xxx
System.out.println(fileStatus.getPath()+"----"+fileStatus.getPath().getName());
//檔案的block資訊
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
System.out.println("block數目:"+blockLocations.length);
}
}
輸出:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5CO4gTN4cjNxMTO2ADN4QmMzYzXyQTMwgTMzIzLclDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
HDFS 上建立檔案夾
/*
hdfs建立檔案夾
*/
@Test
public void mkdirsTest() throws URISyntaxException, IOException, InterruptedException {
//1.擷取FileSystem執行個體
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration(),"root");
//2.擷取檔案夾
boolean bl=fileSystem.mkdirs(new Path("/aaa/bbb/ccc/a.txt"));
//fileSystem.create(new Path("/aaa/bbb/ccc/a.txt"));
System.out.println(bl);
//3.關閉FileSystem
fileSystem.close();
}
下載下傳檔案
/*
檔案下載下傳2:使用方法copyToLocalFile下載下傳到本地E盤下的bbb.txt
*/
@Test
public void downloadFile2() throws URISyntaxException, IOException, InterruptedException {
//1.擷取FileSystem
FileSystem fileSystem=FileSystem.get(new URI("hdfs://hadoop101:8020"),new Configuration(),"root");
fileSystem.copyToLocalFile(new Path("/a.txt"),new Path("E://bbb.txt"));
fileSystem.close();
}
/*
檔案下載下傳
*/
@Test
public void downloadFile() throws URISyntaxException, IOException {
//1.擷取FileSystem
FileSystem fileSystem=FileSystem.get(new URI("hdfs://hadoop101:8020"),new Configuration());
//2.擷取hdfs的輸入流
FSDataInputStream inputStream = fileSystem.open(new Path("/a.txt"));
//3.擷取本地路徑的輸出流
FileOutputStream outputStream = new FileOutputStream("E://a.txt");
//4.檔案的拷貝
IOUtils.copy(inputStream,outputStream);
//5.關閉流
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
fileSystem.close();
}
HDFS 檔案上傳
/*
檔案的上傳
*/
@Test
public void uploadFile() throws URISyntaxException, IOException, InterruptedException {
FileSystem fileSystem=FileSystem.get(new URI("hdfs://hadoop101:8020"),new Configuration(),"root");
fileSystem.copyFromLocalFile(new Path("E://hello.txt"),new Path("/"));
fileSystem.close();
}
小檔案合并
由于 Hadoop 擅長存儲大檔案,因為大檔案的中繼資料資訊比較少,如果 Hadoop 叢集當中有大
量的小檔案,那麼每個小檔案都需要維護一份中繼資料資訊,會大大的增加叢集管理中繼資料的
記憶體壓力,是以在實際工作當中,如果有必要一定要将小檔案合并成大檔案進行一起處理
在我們的 HDFS 的 Shell 指令模式下,可以通過指令行将很多的 hdfs 檔案合并成一個大檔案下
載到本地
cd /export/servers
hdfs dfs -getmerge /*.xml ./hello.xml
既然可以在下載下傳的時候将這些小檔案合并成一個大檔案一起下載下傳,那麼肯定就可以在上傳的
時候将小檔案合并到一個大檔案裡面去
/*
小檔案的合并
*/
@Test
public void mergeFile() throws URISyntaxException, IOException, InterruptedException {
//1.擷取FileSystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop101:8020"), new Configuration(), "root");
//2.擷取hdfs大檔案的輸出流
FSDataOutputStream outputStream = fileSystem.create(new Path("/big_txt.txt"));
//3.擷取一個本地檔案系統
LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());
//4.擷取本地檔案夾下的所有檔案的詳情
FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("E:\\input"));
//5.周遊每個檔案,擷取每個檔案的輸入流
for (FileStatus fileStatus:fileStatuses){
FSDataInputStream inputStream=localFileSystem.open(fileStatus.getPath());
//6.将小檔案的資料複制到大檔案
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
}
IOUtils.closeQuietly(outputStream);
localFileSystem.close();
fileSystem.close();
}
輸出:
裡面的内容是:在E盤下input檔案夾下的5個txt檔案的總和
完成本地小檔案的合并,上傳
HDFS的高可用機制
在Hadoop 中,NameNode 所處的位置是非常重要的,整個HDFS檔案系統的中繼資料資訊都由
NameNode 來管理,NameNode的可用性直接決定了Hadoop 的可用性,一旦NameNode程序
不能工作了,就會影響整個叢集的正常使用。
在典型的HA叢集中,兩台獨立的機器被配置為NameNode。在工作叢集中,NameNode機器中
的一個處于Active狀态,另一個處于Standby狀态。Active NameNode負責群集中的所有用戶端
操作,而Standby充當從伺服器。Standby機器保持足夠的狀态以提供快速故障切換(如果需
要)。
Hadoop的聯邦機制(Federation)
HDFS Federation是解決namenode記憶體瓶頸問題的水準橫向擴充方案。
Federation意味着在叢集中将會有多個namenode/namespace。這些namenode之間是聯合的,
也就是說,他們之間互相獨立且不需要互相協調,各自分工,管理自己的區域。分布式的
datanode被用作通用的資料塊存儲儲存設備。每個datanode要向叢集中所有的namenode注
冊,且周期性地向所有namenode發送心跳和塊報告,并執行來自所有namenode的指令。
Federation一個典型的例子就是上面提到的NameNode記憶體過高問題,我們完全可以将上面部分
大的檔案目錄移到另外一個NameNode上做管理.更重要的一點在于,這些NameNode是共享集
群中所有的DataNode的,它們還是在同一個叢集内的**。**
這時候在DataNode上就不僅僅存儲一個Block Pool下的資料了,而是多個(在DataNode的datadir
所在目錄裡面檢視BP-xx.xx.xx.xx打頭的目錄)。
概括起來:
多個NN共用一個叢集裡的存儲資源,每個NN都可以單獨對外提供服務。
每個NN都會定義一個存儲池,有單獨的id,每個DN都為所有存儲池提供存儲。
DN會按照存儲池id向其對應的NN彙報塊資訊,同時,DN會向所有NN彙報本地存儲可用資源
情況。
HDFS Federation不足:
HDFS Federation并沒有完全解決單點故障問題。雖然namenode/namespace存在多個,但是從
單個namenode/namespace看,仍然存在單點故障:如果某個namenode挂掉了,其管理的相
應的檔案便不可以通路。Federation中每個namenode仍然像之前HDFS上實作一樣,配有一個
secondary namenode,以便主namenode挂掉一下,用于還原中繼資料資訊。
是以一般叢集規模真的很大的時候,會采用HA+Federation的部署方案。也就是每個聯合的
namenodes都是ha的。