天天看點

hadoop權威指南筆記

combiner函數 叢集的可用帶寬限制了MapReduce作業數量,是以應該盡量避免兩者之間資料傳輸是有利的
map輸出指定一個combiner,對資料量進行減少,最後再傳遞給reduce

core-site.xml
<property>
    <name>fs.defaultFS</name>   用于設定預設檔案系統,指定namenode主機以及端口
    <value>hdfs://master:9000</value>    
</property>
因為隻有2個Slave,是以dfs.replication的值設為2。
	dfs.namenode.replication.min  設為1,則寫操作就會成功,最終會保證達到replication數2
	    這裡為異步操作
	第一個複本在用戶端(用戶端在叢集外,則機架随機一個),2.3都随機同一個機架,
	都會避免快滿或忙的節點 其它的放在叢集中随機選擇的節點,系統盡量避免同一機架


檔案hdfs-site.xml
<property>
    <name>dfs.replication</name>   datanode個數
    <value>2</value>
</property>





hdfs dfs -copyFromLocal /home/hadoop/input/1.txt \ hdfs://localhost/user/root/input
等價于         因為core-site.xml已經指定了hdfs://master:9000
hdfs dfs -copyFromLocal /home/hadoop/input/1.txt input

hdfs dfs -copyToLocal output /home/hadoop


hadoop fs -ls output
權限  備份數  ...



從檔案系統讀取資料;

InputStream in = null;
try{
	in = new URL("hdfs://host/path").openStream();  //java.new.URL
	//process in
}finally{
	IOUtils.closeStream(in);
}

public class URLCat{  //每個jvm,setURLStreamHandlerFactory隻能調用一次
	static { //通過FsUrlStreamHandler執行個體擷取檔案
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
	}
	public static void main(String args[]){
		InputStream in = null;
		try{
			in = new URL(args[0]).openStream();  //java.new.URL
			IOUtils.copyBytes(in,System.out,4096,false);
		}finally{
			IOUtils.closeStream(in);
		}
	}
}  //hadoop URLCat hdfs://localhost/user/root/input/1.txt




FileSystem 擷取方法                                    core-site.xml配置檔案
public static FileSystem get(Configration conf);  //conf從設定配置檔案讀取類路徑
public static FileSystem get(URI uri,Configration conf);//若未指定,傳回預設檔案系統
public static FileSystem get(URI uri,Configration conf,String user);//給指定使用者通路

public static LocalFileSystem getLocal(Configration conf);//擷取本地檔案系統

通過FileSystem執行個體調用open擷取輸入流
public FSDataInputStream open(Path f);   //bufferSize default 4kb
public abstract FSDataInputStream open(Path f,int bufferSize);

直接用FileSystem以标準格式輸出顯示檔案系統中檔案
public class FileSystemCat{
	public static void public static void main(String[] args) {
		String uri = args[0];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(uri),conf);
		InputStream in = null;
		try{
			in = fs.open(new Path(uri));  //open(),傳回對象為FSDataInputStream
			IOUtils.copyBytes(in,System.out,4096,false);
		}finally{
			IOUtils.closeStream(in);
		}
	}
}// hadoop FileSystemCat hdfs://localhost/user/root/input/1.txt
// in.seek(0);傳回流的起點 ,支援随機通路


從檔案系統寫入資料;
public FSDataInputStream create(Path f );
//有多個重載版本,指定是否強制覆寫、備份數量、緩沖區大小、檔案塊大小、檔案權限
//需要先判定 父目錄是否存在eixsts()
追加資料
public FSDataInputStream append(Path f);  //并發所有檔案系統支援,如S3不支援

public class FileCopyWithProgress{
	public static void main(String[] args) {
		String loaclSrc = args[0];
		String dst = args[1];
		InputStream in = new BufferedInputStream(new FileInputStream(loaclSrc));
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst),conf);
		OutputStream out  = fs.create(new Path(dst),new Progress(){
			public void progress(){
				System.out.print("callback!");
			}
		});//FSDataInputStream,不允許在檔案中定位,即隻允許順序輸入或末尾追加資料。
		IOUtils.copyBytes(in,out,4096,true);
	}
}//hadoop FileCopyWithProgress input/docs/1.txt hdfs://localhost/user/root/input/1.txt

目錄
public boolean mkdirs(Path f);//建立所有必要但是沒有的父目錄

查詢檔案系統
1.檔案中繼資料:FileStatus  
	封裝了檔案系統中檔案和目錄的中繼資料,包括長度、塊大小、複本、修改時間、所有者、權限
getFileStatus(Path f);)

2.列出檔案
public FileStatus[] listStatus(Path f);
public FileStatus[] listStatus(Path f,PathFilter filter);
public FileStatus[] listStatus(Path[] files);
public FileStatus[] listStatus(Path[] files,PathFilter filter);

public class ListStatus{
	public static boolean downLoadHDFS(String hdfsSrc, String localDst) throws IOException {
        Configuration conf = new Configuration();
        Path dstpath = new Path(hdfsSrc);
        int i = 1;
        FileSystem fs = FileSystem.get(URI.create(hdfsSrc), conf);
        try {
            String subPath = "";
            FileStatus[] fList = fs.listStatus(dstpath);
            for (FileStatus f : fList) {
                if (null != f) {
                    subPath = new StringBuffer()
                            .append(f.getPath().getParent()).append("/")
                            .append(f.getPath().getName()).toString();
                    if (f.isDir()) {
                        downLoadHDFS(subPath, localDst);
                    } else {
                        System.out.println("/t/t" + subPath);// hdfs://54.0.88.53:8020/
                        Path dst = new Path(subPath);
                        i++;
                        try {
                            Path Src = new Path(subPath);
                            String Filename = Src.getName().toString();
                            String local = localDst + Filename;
                            Path Dst = new Path(local);
                            FileSystem localFS = FileSystem.getLocal(conf);
                            FileSystem hdfs = FileSystem.get(URI
                                    .create(subPath), conf);
                            FSDataInputStream in = hdfs.open(Src);
                            FSDataOutputStream output = localFS.create(Dst);
                            byte[] buf = new byte[1024];
                            int readbytes = 0;
                            while ((readbytes = in.read(buf)) > 0) {
                                output.write(buf, 0, readbytes);
                            }
                            in.close();
                            output.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                            System.out.print(" download failed.");
                        } finally {
                        }
                    }
                }
            }
        } catch (Exception e) {
        } finally {
            System.out.println("the number of files is :" + i);
        }
        return true;
    }
}

public boolean delete(Path f,boolean recursive);
若f為檔案或者空目錄,recursive會被忽略,隻有在recursive為true時,非空目錄及其内容才會被删除否則抛異常


3.檔案模式
public class regex implements PathFilter{
	private final String regex;
	PathFilter(String regex){
		this.regex= regex;
	}
	public boolean accept(Path path){
		return !path.toString().matches(regex);
	}
}
           
hadoop權威指南筆記
hadoop權威指南筆記
删除資料
public boolean delete(Path f,boolean recursive);
若f為檔案或者空目錄,recursive會被忽略,隻有在recursive為true時,非空目錄及其内容才會被删除否則抛異常


資料流,通過網絡拓撲,将資料塊與用戶端距離進行排序,通過對InputStream反複調用read()方法,将datanode
傳輸到用戶端,當到達末端時,InputStream關閉連接配接,然後尋找下一塊最佳datanode。
距離有4種,同一程序、同一機架、同一資料中心、不同資料中心  1,2,4,6

一緻模型  hflush()僅保證在datanode記憶體中,減輕hadoop負載  hsync()強制寫入磁盤
  檔案系統保證建立檔案立馬可見,但是寫入檔案内容不一定立刻可見,即使已經重新整理并存儲(僅保證在datanode記憶體中)
總之正在寫入的塊對其它reader不可見。


并行複制 hadoop distcp file1 file2                   distcp本質上是一個map作業
 		hadoop distcp dir1 dir2    dir2不存在則會建立,并且可以指定多個源路徑
                                  dir不存在,則目錄dir1會複制到dir2目錄下 /dir2/dir1
-overwrite   -update		僅當更新發送變化的時候才複制
保持負載均衡,保證每個節點20個map   通過 -m 指定




#yarn 叢集資源管理系統
ResourceManager
NodeManager

資源請求
	當請求多個容器時,可以指定每個容器需要的計算資源數量(CPU、RAM)還可以指定對容器的本地限制
  可以限制容器申請指定節點或機架或任何位置的容器,有時當限制無法被滿足時,會自動放寬
  	yarn應用可以在運作中的任意時刻提出資源申請,動态調整

應用生命周期   幾秒到幾個月都有
模型	
	1.最簡單的是,一個使用者作業對應一個應用
	2.每個工作流或每個使用者對話對應一個應用,效率比1高,容器在作業之間可以重用 spark
	3.多使用者共享一個長期運作的應用作為協調者的角色運作(application master)

建構yarn應用
	MapReduce1 jobtracter負責作業排程和任務進度監控,       tasktracter負責工作  Slot
	MapReduce2 資料總管、applicationmaster、時間軸伺服器     節點管理器         容器

MapReduce1 每個tasktracter都配置有固定長度map,reduce的Slot,且不能重用
MapReduce2 可重用 ,将MapReduce變為了yarn應用

YARN有3種排程器:FIFO排程器、容量、公平。
	FIFO排程器:簡單易懂、不需任何配置,但是不适合共享叢集,大應用會占用叢集所有資源
	容量:能保證長時間作業能及時完成,同時小作業能在合理時間得到結果,與FIFO相比,大作業
	      執行時間更長點
	公平:(特性同容量第一行)不需要預留一定量的資源,


容量排程器配置
	将資源分為多個隊列,每個隊列占用的資源比例不同,并且可以設定 最大資源,隊列可以動态申請
	超過資源比例,但是不能超過最大資源限制。
           
hadoop權威指南筆記

 capacity-scheduler.xml

hadoop權威指南筆記
hadoop權威指南筆記

prod、eng合法    root.dev.eng與dev.eng不合法

公平任務排程器

    fair-scheduler.xml

hadoop權威指南筆記

這裡4:6配置設定資源符合公平排程原則,指的是隊列的公平性。

hadoop權威指南筆記
hadoop權威指南筆記
hadoop權威指南筆記

支援搶占功能 yarn.scheduler.fair.preemption=true

hadoop權威指南筆記

延時排程:可以 提高容器效率(不超過幾秒)   容器、公平都支援

主導資源公平性,如A需要大量CPU少量RAM,B需大量RAM,少CPU

DRF

繼續閱讀