天天看點

hadoop詳細學習筆記一.Hadoop第一天1.3 hdfs架構的簡單原理:二.hadoop第二天三.hadoop第三天五.hadoop第五天六.hadoop第六天

hadoop

一.Hadoop第一天

1.1 大資料引言

1、什麼是大資料?

對大量資料集檔案處理的過程 b—k---m—G---T—PB—EB—ZB 資料 換算機關 1024

2、大資料的特點?

(1)Volume大量 資料量非常大 TB級别以上的

(2)Variety多樣 資料格式多樣 結構化資料:資料庫中的資料 半結構化資料:json mongdb 非結構化資料:圖檔,視訊 ,音頻。

(3)、Velocity快速 資料的處理一定要快

(4)、Value價值 在大量資料中分析出自己有價值的資料

1.2 hadoop核心設計:

(1)hdfs 大資料檔案存儲問題

HDFS(Hadoop Distributed File System)Hadoop 分布式檔案系統:

基于流資料模式通路

就是可以位元組序列化的資料,java.io.Serializable接口

分布式檔案系統處理的資料必須是流資料,可以寫IO操作的資料

它是以128MB的資料塊 存儲檔案(在Hadoop 1.x版本中是以64MB的資料塊存儲檔案的)

其中每一個存儲節點上都有一個DataNode程序,由NameNode來進行協調。

(2)mapreduce 程式設計模型 計算架構

1.3 hdfs架構的簡單原理:

NameNode: 是整個HDFS叢集的總入口,存儲着HDFS的叢集的檔案中繼資料(如:client上傳檔案的檔案名 副本數 塊數等相關資訊)。

  • DataNode: 是真正用來負責存儲資料的節點,一個DataNode就是一個真實的實體主機。
  • Block: 資料塊,為了能通過多個節點儲存大資料集,HDFS将大資料集檔案切分成一塊塊的資料塊,在現有hadoop2版本中預設一個塊大小為128M。

metadata:元檔案 (ip 檔案名 block(預設128M) 副本 配置設定到那個datanode)

1.4 hadoop安裝:

1、安裝centos7、x虛拟機,并且啟動

2、輸入hostname 檢視目前主機名字

3、使用vim /etc/hostname 修改主機名字

4、添加主機名字并且和ip映射 vim /etc/hosts

格式 ip(目前的ip位址) centos(主機名字)

5、重新centos系統 reboot 或者關閉虛拟機 重新連接配接

6、必須安裝jdk 并且配置環境變量

7、hadoop下面的目錄及其内部檔案結構

bin ----------可執行的二進制腳本檔案 etc/hadoop目錄 hadoop系統配置檔案所在的目錄

hadoop-env.sh -----------配置環境

core-site.xml ----------配售hdfs叢集的核心配置

hdfs-site.xml ----------用來對hdfs檔案系統做配置的

share -----------用來存放hadoop的依賴jar第三方jar目錄

lib ----------用來存放hadoop使用核心庫檔案

8、配置hadoop的環境變量 /etc/profile

在結尾加上export HADOOP_HOME=/usr/hadoop-2.9.2

export PATH=PATHJAVA_HOME/bin:HADOOP_HOME/bin:HADOOP_HOME/sbin

9、配置core-site.xml

vim /usr/hadoop-2.9.2/etc/hadoop/core-site.xml 加入如下配置:

(就是配置namenode的全局入口====給哪個機器配置namenode 相當于配置位址)

<configuration>
  <!--配置hdfs檔案系統預設名稱-->
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://hadoop(主機名):9000</value>
  </property>
</configuration>
           

10、配置hdfs-site.xml

vim /usr/hadoop-2.9.2/etc/hadoop/hdfs-site.xml 加入如下配置: (配置檔案的副本個數

預設是三個)

<configuration>
  <property>
    <name>dfs.replication</name>
    <value>1</value>
  </property>
</configuration>
           

11、配置slaves檔案(将namenode和dataname聯系起來 啟動了namenode namenode就啟動了datanode)

vim /usr/hadoop-2.9.2/etc/hadoop/slaves 加入如下配置: hadoop (目前主機名)

12、格式化hdfs

hdfs namenode -format (僅僅是第一次使用需要格式化)

注意:這裡的格式化是格式成hadoop可以識别的檔案系統,比如我們買了一塊硬碟我們需要格式化成windows或者mac,linux系統識别的檔案系統,才能使用這個檔案系統。

關閉防火牆

systemctl stop firewalld systemctl disable firewalld

二.hadoop第二天

2.1 hadoop中提供的兩種腳本

1、hadoop對hdfs叢集的管理提供了兩種腳本

a、hadoop-daemon.sh start namenode/dataname/secondarynamenode 本地啟動腳本

作用:對叢集中的單個節點進行操作

b、start-dfs.sh 叢集啟動腳本 作用:對進群中所有節點統一操作 指令:start-dfs.sh

可以在任意節點使用,如果在從節點使用,會找到從節點中core-size配置檔案中namenode的入口(就是namenode的位址,他在哪個伺服器上),然後嘗試登入到此namenode伺服器上,再通過namenode中的slaves配置檔案找到所有的datanode伺服器機器名字

通過事先配好的映射找到對應的伺服器ip登入他們,進而打開所有的datanode伺服器

本地啟動腳本,在叢集環境中需要去啟動每一台伺服器,工作量較大。

叢集啟動腳本,整個叢集隻需要啟動一次。啟動過程:随即在一台伺服器上輸入start-dfs.sh指令,然後會通過core-size.xml配置檔案中配置namendoe的位址,去找NameNode的所在的伺服器,會登入此伺服器啟動namenode,然後根據此台伺服器中的slaves配置檔案(namenode與datanode的關聯檔案)中的伺服器名稱,依次映射對應的ip位址找到對應的datanode伺服器,登入啟動所有datanode伺服器。

2.2 修改hdfs預設存儲位置data

通過檢視日志得知namenode資料datanode資料預設都是存放在/tmp/tmp/hadoop-root/dfs下(這裡是臨時的檔案存儲目錄),這對于我們來說是不安全的,系統可能會定期的清除目錄中的檔案,是以為了保證資料的安全完整性,我們需要修改此檔案的預設存儲位置。

2.2.1、修改hdfs預設資料的位置需要修改core-site.xml檔案配置 加入去下配置即可

<property>
	<name>hadoop.tmp.dir</name>
	<value>/root/hadoop-2.9.2/data</value>
</property>
           

2.3 SSH工作原理和免密登入

解決啟動hsfs叢集時的登入驗證密碼問題,需要使用ssh協定的第二種方式

2.3.1、ssh協定:

第一種方式:基于密碼的安全驗證 第二種方式:基于密匙的安全驗證

2.3.2、ssh秘鑰原理說明:

存在兩個伺服器A,B。A通路登入B,A需要生成一組秘鑰(公鑰和私鑰),A将公鑰配置在B的信任檔案中,在A要登入B的時候,A會帶着公鑰去找B,B拿到公鑰後先判斷信任檔案中是否含有此公鑰,如果沒有,會拒絕A的登入,如果有會根據此公鑰和随機産生的字元串生成新的資詢再傳給A,A根據私鑰将質詢破解,得到字元串再傳回給B,如果B此次得到的字元串和随機生成的字元串完全相同,則A成功登入B。

2.3.3、配置SSH免密登入

  1. 生成ssh秘鑰對 ssh-keygen -t rsa 然後回車幾次就可以啦
  2. 檢視秘鑰對生成位置 ls /root/.ssh 會發現在home目錄中生成了兩個檔案

    id_rsa(私鑰) id_rsa.pub(公鑰)

  3. 将公鑰加入另一台機器的受信清單中 ssh-copy-id hadoop(主機名) cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys (和上面指令一樣)
  4. 再次檢視/root/.ssh 目錄 多出兩個檔案其中authorized_keys就是存放公鑰清單檔案 authorized_keys id_rsa id_rsa.pub known_hosts
  5. 檢測是否配置成功 ssh hadoop 不需要輸入密碼即可

2.3.4、Shell基本指令總結

檢視目錄:hdfs dfs -ls / 上傳檔案:hdfs dfs -put aa.txt / 建立檔案(疊代建立):hdfs

dfs -mkdir -p /bbb/cccc 檢視檔案内容:hdfs dfs -cat /aa.txt 追加檔案内容:hdfs

dfs -appendToFile bb.txt /aa.txt hdfs中複制檔案: hdfs dfs -cp /aa.txt

/datas 下載下傳檔案到本地:hdfs dfs -get /aa.txt /root/down.txt

查找檔案名的目錄:hdfs dfs -find / -name “aa.txt” 将hdfs檔案移動到hdfs另一個位置:hdfs

dfs -mv /bb.txt /datas/bb.txt

2.4 java操作HDFS

2.4.1. 引入依賴

<properties>
  <hadoop.version>2.9.2</hadoop.version>
</properties>
<dependencies>
  <!--hadoop公共依賴-->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
  </dependency>
  <!--hadoop client 依賴-->
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>${hadoop.version}</version>
  </dependency>
  <!--junit-->
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
  </dependency>
</dependencies>
           

2.4.2 擷取hdfs用戶端

public class TestHDFS {
    private FileSystem fileSystem;  //hdfs用戶端對象
    @Before
    public void before() throws IOException {
        //hadoop檔案系統的權限設定為root  包裝僞裝使用者
        System.setProperty("HADOOP_USER_NAME","root");
        //或者在hdfs-size.xml中配置  修改windows對hdfs的權限  true是預設不支援權限
            3
        //用來對core-site.xml hdfs-site.xml進行配置
        Configuration conf = new Configuration();
        //連接配接hdfs
        conf.set("fs.defaultFS","hdfs://10.15.0.4:9000");
        //設定上傳檔案的副本集
        conf.set("dfs.replication","1");
        fileSystem =  FileSystem.get(conf);
    }
    @After
    public void close() throws IOException {
        fileSystem.close();
    }
}
           

2.4.3 上傳檔案到hdfs

@Test
public void testUpload() throws IOException {
    //流入流 讀取流
	FileInputStream is=new FileInputStream("計算機中要上傳檔案的絕對路徑")
    Path path = new Path("上傳到hdfs上面的哪個檔案");
    FSDataOutputSytream os = fileSystem.create(path);
    //參數1:讀取流、參數2 列印流、參數3:位元組數、參數4:是否關流
    IOUtils.copyBytes(is,"os",1024,true);
}
           

2.4.4 hdfs下載下傳檔案`

// 1.第一種方式
@Test
public void testDownload() throws IOException {
	Path source = new Path("hdfs上面要下載下傳檔案的路徑");
    Path path = new  Path("要下載下傳到本地的位置");
   	//參數一:是否删除源檔案  參數二:hdfs上下載下傳的檔案 參數三:下載下傳到本地檔案的位置 參數四:windows相容
    fileSystem.copyTolocalFile(flase,source,path,true);
}
// 2.第二種方式
@Test
public void testDownload1() throws IOException {
	Path path = new  Path("在hdfs上面要下載下傳的檔案");
    FSDataInputStream in = fileSystem.open(path);
    FileOutputStream fileOutputStream = new FileOutputStream("要下載下傳到本機的絕對路徑");
    //參數一:讀取流  參數二:列印流  參數三:位元組數  參數四:是否關流
    IOUtils.copyBytes(in,fileOutputStream,1024,true);
}
           

2.4.5 展示hdfs目錄和檔案

@Test
public void testListDirs() throws IOException {
  Path path = new Path("/");
  FileStatus[] fileStatuses = fileSystem.listStatus(path);
  for (FileStatus fileStatus : fileStatuses) {
    System.out.println(fileStatus.isDirectory()+" "+fileStatus.getPath());
  }
}
           

三.hadoop第三天

3.1hadoop體系下的配置檔案

hadoop配置檔案解析順序 從高到低

  • javacode中Confriguration 設定會覆寫core-default.xml hdfs-default.xml中内容
  • javaclient jar 中的預設 core-default.xml hdfs-default.xml中内容 (預設dataname有三個)
  • hadoop安裝包自定義配置 hadoop etc hadoop core-size.xml hdfs-size.xml内容會覆寫hadoop中預設内容
  • hadoop中預設的core-default.xml hdfs-default.xml中内容(預設dataname有三個)

3.2 NameNode 和 SecondaryNameNode

3.2.1、namenode的中繼資料資訊到底存放在哪裡?

由于client對hdfs操作過于頻繁,操作的檔案都是随機操作,是以為了提供hdfs的叢集效率,hdfs叢集将 namenode中資料(中繼資料)始終放在記憶體中。

特點:不安全,記憶體斷電會立即丢失資料。

3.2.2、如何對NameNode資料進行持久化?

FsImage:儲存NameNode目前這一時刻資料狀态 相當于快照

Editslog:日志檔案,隻記錄client到hdfs寫操作(二級制檔案)

用來記錄使用者的寫操作。每當中繼資料有更新或者添加中繼資料時,修改記憶體中的中繼資料并追加到Edits中。

3.2.3、如何解決Edits日志檔案越來越大的問題?

secondaryNameNode:用來定期的完成對FsImage和Edits日志的操作。

規定是一個小時,或者用戶端進行1000000次操作時進行合并。

面試題:secondaryNameNode有什麼作用,沒有可不可以?

答:可以,但是會使edist日志檔案越來越大,效率大大降低。

NameNode在關閉的時候會預設進行edits和fsimage的合并,每次啟動NameNdoe,他都會滾動一次

産生新的edits1… 新的用戶端記錄檔會存儲在新産生的edits1中

3.3 HDFS的HA叢集原理分析:

3.3.1、簡單HDFS叢集中存在的問題:

單節點問題,單節點自動故障轉移

3.3.2、如何解決NameNode單節點問題?

a、找另一個namenode備份原有的namenode資料

b、如何解決叢集中的腦裂問題(一個叢集中多個管理者資料不一緻這種情況稱之為------腦裂)

3.3.3、如何解決啟動多個NameNode時保證同一時刻隻有一個namenode工作,避免腦裂問題?

QJM使用Zookeeper 完成高可用

面試題:在HA叢集中如果ZK與NameNode(advice)出現網絡上延遲這種情況,ZK會自動切換NameNode(standby)為活躍節點,這個時候叢集就出現了多個活躍的NameNode(advice) 也就意味着出現了腦裂的問題!怎麼解決?

提示:考察的是JournalNode中的兩種功能: 1、同步資料 2、隔離機制

答案:JournalNode有一下兩種功能:

1、負責NameNode中edits的同步資料。

2、JournalNode的隔離機制,因為它是唯一個連接配接着兩個NameNode的元件,是以他會第一個發現。為了保證這一時刻隻有一個NameNode(advice)活躍,他會使用ssh登入到NameNode節點上,使用kill指令殺死NameNode

四.hadoop第四天

4.1高可用HDFS具體步驟:

4.1.1、首先搭建ZK叢集,安裝zk tar -zxvf zookeeper-3.4.12.tar.gz,同步zk2 zk3 scp -r zookeeper-3.4.12 [email protected]:/root/ ,在每一個節點上建立zk資料目錄mkdir /root/zkdata,在每一個節點存在zk資料的目錄中必須建立一個myid檔案,數位元組點唯一 zk1:echo “1” >>zkdata/myid,zk2:echo “2” >>zkdata/myid,zk3:echo “3” >>zkdata/myid,在每一個zk節點資料目錄中建立一個名字為zoo.cfg zk配置檔案 zk1-2-3:touch zkdata/zoo.cfg

修改不同的節點配置zk1-2-3

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/root/zkdata1

clientPort=300

server.1=主機名:3002:3003

server.2=主機名:4002:4003

server.3=主機名:5002:5003

啟動zk節點:如果沒有配置zk的環境變量需要進入bin才能啟動,./zkServer.sh start /root/zkdata/zoo_sampe.cfg ,檢視狀态:./zkServer.sh status /root/zkdata/zoo_sampe.cfg

4.1.2、然後搭建HDFS叢集:

設定主機名和ip映射,配置環境變量,配置ssh免密碼登入,在namenode上面生成公私秘鑰,将公鑰傳給datanode的伺服器,ssh-keygen -t rsa 生成公私秘鑰,ssh-copy-id namenode主機名,在各個伺服器上面必須安裝依賴 yum install psmisc -y 在每一台伺服器上面安裝hadoop并且壓縮,配置hadoop的環境變量(否則指令隻能在bin中執行)source /etc/profile 測試 echo $PATH

修改hadoop中的配置檔案:hadoop-env.sh

  • core-size.xml中的配置檔案
- <!--hdfs主要入口不再是一個具體機器而是一個虛拟的名稱ns 後面配置檔案會映射所對應所有namenode -->
- <property>
  <name>fs.defaultFS</name>
  <value>hdfs://ns</value>
  </property>
- <property>
  <name>hadoop.tmp.dir</name>
  <value>/root/hadoop-2.9.2/data</value>
  </property>
- <!--這裡的hadoop指的是zk叢集對應的每一個伺服器名稱和對應的端口-->
- <property>
  <name>ha.zookeeper.quorum</name>
   <value>hadoop1:3001,hadoop1:4001,hadoop1:5001</value>
   </property>
- hdfs-site.xml中的配置檔案
- <!--指定hdfs的nameservice為ns,這裡和core-size.xml中保持一緻-->
  	<property>
  		  <name>dfs.nameservices</name>
  		  <value>ns</value>
  	  </property>
- <!-- ns下面有兩個NameNode,分别是nn1,nn2 -->
  <property>
  		  <name>dfs.ha.namenodes.ns</name>
  		  <value>nn1,nn2</value>
  	  </property>
- <!-- nn1的RPC通信位址 -->
  <property>
  		  <name>dfs.namenode.rpc-address.ns.nn1</name>
  		  <value>hadoop2:9000</value>
  	  </property>
- <!-- nn1的http通信位址 -->
  <property>
  		  <name>dfs.namenode.http-address.ns.nn1</name>
  		  <value>hadoop2:50070</value>
  	  </property>
- <!-- nn2的RPC通信位址 -->
  <property>
  		  <name>dfs.namenode.rpc-address.ns.nn2</name>
  		  <value>hadoop3:9000</value>
  	  </property>
  	  <!-- nn2的http通信位址 -->
  	  <property>
  		  <name>dfs.namenode.http-address.ns.nn2</name>
  		  <value>hadoop3:50070</value>
  	  </property>
- <!-- 指定NameNode的中繼資料在JournalNode上的存放位置 這裡的hadoop是journalNode所在伺服器-->
  	<property>
  		<name>dfs.namenode.shared.edits.dir</name>
  		<value>qjournal://hadoop2:8485;hadoop3:8485;hadoop4:8485/ns</value>
  	</property>
- <!-- 指定JournalNode在本地磁盤存放資料的位置 -->
  <property>
  		<name>dfs.journalnode.edits.dir</name>
  		<value>/root/journal</value>
  	</property>
- <!-- 開啟NameNode故障時自動切換 -->
  <property>
  		<name>dfs.ha.automatic-failover.enabled</name>
  		<value>true</value>
  	</property>
- <!-- 配置失敗自動切換實作方式 -->
  <property>
  		<name>dfs.client.failover.proxy.provider.ns</name>
  		<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  	</property>
- <!-- 配置隔離機制,如果ssh是預設22端口,value直接寫sshfence即可 -->
  <property>
  		<name>dfs.ha.fencing.methods</name>
  		<value>sshfence</value>
         <value>shell(true)</value>
  	</property>
- <!-- 使用隔離機制時需要ssh免登陸 -->
  <property>
  		<name>dfs.ha.fencing.ssh.private-key-files</name>
  		<value>/root/.ssh/id_rsa</value>
  	</property>
           
  • 修改slaves檔案指定哪些機器為DataNode

    hadoop2

    hadoop3

    hadoop4

  • 在任意一個namenode上執行格式化zk指令:hdfs zkfc -formatZK
  • 啟動journalnode(分别在每一個存在journalnode上執行hadoop-daemon.sh start journalnode)

    使用:jps檢視如過都出現journalnode程序說明啟動成功`

    在(NameNode active)節點執行格式化指令:hdfs namenode -format ns

    啟動hdfs叢集:start-dfs.sh

    在standby 的 NameNode節點上執行格式化指令: hdfs namenode -bootstrapStandby

    啟動standby 的 NameNode節點:hadoop-daemon.sh start namenode

4.2MapReduce

4.2.1核心:計算 hadoop體系下面的一個程式設計模型,計算架構 主要是用來操作HDFS中存儲資料并對HDFS中資料進行計算。

4.2.2:Map:局部計算 Reduce:彙總計算 (兩者都是需要我們寫的代碼)

4.3 Yarn 理論概念

1、概念:統一資源排程器,任務監控管理器

2、作用:整合hadoop叢集中的資源(cpu,記憶體)進行統一的排程 哪個伺服器需求大 就給誰。

任務監控 監控map和reduce的執行情況
           

Yarn = ResourceManage (master) + NodeManager(slave)

注意:DataNode伺服器一定是NodeManager

ResourceManager作用-----------真正的資源管理者,監控者,決定給每個伺服器的資源多少!

NodeManager作用1--------- map和Reduce的執行者 他們是程式員寫的計算代碼 NodeManager在每一個Datanode中執行每一個map(map計算block内容) 就相當于idea的功能 彙總計算等待所有局部計算完成後 在一個大功能伺服器中(存在NodeManage的伺服器)将所有計算彙總 同樣也是NodeManager執行,

作用2-------NodeManager還會向ResourceManager彙報資訊,需要多少記憶體和cpu等資源(其實是ApplicationMaster彙報的)确定此NodeManager還活着。

3.Job作業

一組MapReduce也被稱之為一個job作業,代表一種計算 hadoop的叢集

4.4 Yarn叢集搭建

1、叢集規劃:

10.15.0.15 hadoop15 NameNode DataNode NodeManager

10.15.0.16 hadoop16 DataNode NodeManager ResourceManager

10.15.0.17 hadoop17 DataNode NodeManager

2、

a、修改ip位址 修改主機名稱 重新開機機器

b、配置主機名稱和對應的ip映射。

c、配置ssh免密登入。

NameNode hadoop15: 生成ssh-keygen ssh-copy-id hadoop15 16 17

ResourceManager hadoop16: 生成ssh-keygen ssh-copy-id hadoop15 16 17

d、配置環境變量

3、安裝hadoop 配置檔案

hadoop-enc.sh core-site.xml hdfs-size.xml maped-site.xml

yarn-site.xml slaves:作用 決定namenode中的datanode是誰

決定resourceManager的nodemanager是誰

4、hdfs叢集相關配置省略:

4.1.複制 cp hadoop-2.9.2/etc/hadoop/mapred-site.xml.template hadoop-2.9.2/etc/hadoop/mapred-site.xml

4.2.編輯 vim hadoop-2.9.2/etc/hadoop/mapred-site.xml添加配置

<property>
	<name>mapreduce.framework.name</name>
	<value>yarn</value>
</property>
           

4.3 配置yarn-site.xml

<property>
	<name>yarn.nodemanager.aux-services</name>
	<value>mapreduce_shuffle</value>
</property>
<!--`hadoop`為ResourceManager目前機器的主機名-->
<property>
	<name>yarn.resourcemanager.hostname</name>
	<value>Hadoop</value>
</property>
           

4.4、啟動yarn:隻能在有ResourceManager的伺服器上面啟動: start-yarn.sh

浏覽器通路yarn頁面: http://10.15.0.16:8088/cluster

五.hadoop第五天

5.1 Job作業體系結構

5.1.1:job作業的資料來源一定是Hdfs,最總結果儲存到hdfs中,在整個五個階段中 map階段和reduce階段是程式員代碼手工編碼的。

hdfs----->inputFormat------>map------------------>shuffle---------------->reduce------->outputFormat------->hdfs

hdfs----->資料讀入 ------->局部計算---->局部計算資料排序和分組---->彙總計算---->結果輸出---------------->hdfs

5.1.2:

InputFormat: keyin—>行字母偏移量 Longwritable valuein------->讀取的一行 字元串 Text

keyout--->行字母偏移量  Longwritable    valueout------->讀取的一行 字元串  Text
           

map : keyin—>行字母偏移量 Longwritable valuein------->讀取的一行 字元串 Text

keyout--->每一行得到的資料(或者分割的部分資料)Text    valueout---->根據需求設定
           

shuffle :局部計算進行排序和彙總

reduce :keyin----->根據map傳過來的進行判斷 value------->根據map傳過來的進行判斷

keyout--->根據需求設定           valueout---->根據需求設定
           

5.2 mapreduce程式代碼

5.2.1:步驟:

準備資料檔案 将資料檔案上傳到hdfs上面

引入依賴 開發job作業(開發map階段 開發reduces階段 開發job階段)

5.2.2:開發Job作業編碼

//word count job作業開發
public class WordCountJob extends Configured implements Tool {


    public static void main(String[] args) throws Exception {
        ToolRunner.run(new WordCountJob(),args);
    }

    @Override
    public int run(String[] strings) throws Exception {
        //建立job作業
        Configuration conf = getConf();
        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountJob.class);

        //設定Input Format
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("/wordcount/data"));

        //設定map階段
        job.setMapperClass(WordCountMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //設定Shuffle 階段 預設

        //設定reduce 階段
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //設定 Output Formate
        job.setOutputFormatClass(TextOutputFormat.class);
        //注意:要求結果目錄不能存在
        FileSystem fileSystem = FileSystem.get(conf);
        Path res = new Path("/wordcount/res");
        if(fileSystem.exists(res)) {
            fileSystem.delete(res,true);
        }
        TextOutputFormat.setOutputPath(job, res);

        //送出job作業
        boolean b = job.waitForCompletion(true);
        System.out.println("作業執行狀态 = " + b);

        return 0;
    }

    //開發Map階段
    public static class WordCountMap extends Mapper<LongWritable, Text,Text, IntWritable>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //value 就是讀入的一行資料
            String[] keys = value.toString().split(" ");
            for (String word : keys) {
                context.write(new Text(word),new IntWritable(1));
            }
        }
    }

    //開發Reduce階段
    public static class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum+=value.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }

}
注意:在執行mapreduce作業過程中,一定涉及到資料資料的序列化,hadoop對原始基本資料類型進行了二次包裝

  hadoop中包裝類型   	java原始資料類型
  Text          	String    
  LongWritable  	Long      
  IntWritable   	Integer   
  FloatWritable 	Float     
  DoubleWritable	Double    

           

5.3MapReduce 自動化運作

5.3.1打包時指定main Class資訊

5.3.2使用wagon插件實作自動上傳至hadoop叢集

5.3.3使用wagon上傳jar完成後遠端執行job作業

<build>
  <!--擴充maven的插件中加入ssh插件-->
	<extensions>
		<extension>
			<groupId>org.apache.maven.wagon</groupId>
			<artifactId>wagon-ssh</artifactId>
			<version>2.8</version>
		</extension>
	</extensions>
	<plugins>
        <!-- 在打包插件中指定main class 資訊 -->
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <configuration>
              <outputDirectory>${basedir}/target</outputDirectory>
              <archive>
                <manifest>
                  <mainClass>com.baizhi.wordcount.WordCountJob</mainClass>
                </manifest>
              </archive>
            </configuration>
          </plugin>
		<plugin>
              <groupId>org.codehaus.mojo</groupId>
              <artifactId>wagon-maven-plugin</artifactId>
              <version>1.0</version>
              <configuration>
                    <fromFile>target/${project.build.finalName}.jar</fromFile>
                    <url>scp://root:[email protected]/root</url>
                    <commands>
                      <!-- 通過sh 執行shell腳本檔案 -->
                      <command>nohup hadoop-2.9.2/bin/hadoop jar hadoop_wordcount-1.0-SNAPSHOT.jar > /root/mapreduce.out 2>&amp;1 &amp; </command>
                    </commands>
                    <displayCommandOutputs>true</displayCommandOutputs>
              </configuration>
        </plugin>
	</plugins>
</build>
           

導入插件maven helper

wagon配置加入commands指令執行指令操作:clean package wagon:upload-single wagon:sshexec

5.4 配置曆史伺服器

5.4.1配置mapped-site.xml 并同步叢集配置

<property>
   <name>mapreduce.jobhistory.address</name>
   <value>hadoop5:10020</value>
</property>
<property>
   <name>mapreduce.jobhistory.webapp.address</name>
   <value>hadoop5:19888</value>
</property> 
           

5.4.2 配置yarn-site.xml 并同步叢集配置`

<!--開啟日志聚合-->
<property>
  <name>yarn.log-aggregation-enable</name>
  <value>true</value>
</property>
<!--日志儲存時間 機關秒 這裡是7天-->
<property> 
  <name>yarn.log-aggregation.retain-seconds</name>
  <value>604800</value>
</property>
           

5.4.3 啟動曆史伺服器

[root@hadoop6 ~]# mr-jobhistory-daemon.sh start historyserver
[root@hadoop6 ~]# mr-jobhistory-daemon.sh stop historyserver
           

5.4.5使用日志

private Logger logger= Logger.getLogger(想打日志的類對象);

logger.info(想列印的資訊)

六.hadoop第六天

6.1自定義類型

資料類型都間接實作了:Wirtable(實作序列化反序列化功能,實作他的write寫的序列化,readFields讀的序列化)Comparable(實作排序功能,實作它的compareTo比較方法完成排序功能) 。直接實作WritableComparable接口(此接口繼承Writable,Comparable),是以我們自定義類型也需要實作相應的接口

通過檢視源碼得知自定義的資料類型需要實作類中 wirte、readFiles、compareTo、hashCode 和equals、toString等相關方法。如果存在位址不同的自定義對象位址相同,則需要繼承hashCode 和equals方法。

開發自定義Writable類型

//自定義Writable類型
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {
    private Integer upload;
    private Integer down;
    private Integer total;

    @Override
    public int compareTo(AccessLogWritable o) {
        return this.total-o.getTotal();
    }
	//把對象序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(upload);
        out.writeInt(down);
        out.writeInt(total);
    }
	//反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upload = in.readInt();
        this.down = in.readInt();
        this.total = in.readInt();
    }

    @Override
    public String toString() {
        return "統計結果{" +
                "上傳流量=" + upload +
                ", 下載下傳流量=" + down +
                ",  上傳下載下傳總流量=" + total +
                '}';
    }
    注意:write的順序和read的順序必須嚴格一緻,讀的類型和寫的類型也必須完全一緻
           

6.2mapreduce進階特性

6.2.1、mapreduce 計算、去重、排序、清洗工作

資料清洗:在複雜的資料中抽取我們業務關注的資料的過程稱之為資料的清洗。

資料清洗的好處:減小資料的檔案體積、加速資料的計算效率。

資料清洗沒有reduce階段 這個設定是job.setNumReduceTasks(0)

6.2.2、reduce個數設定!

預設job作業中隻有一個reduce,可以設定多個reduce,通過job.setNumReduceTasks(n)設定有n個。

設定多個reduce提高了mapreduce的運作效率,一旦設定了多個reduce,結果會根據reduce數量放入不同的産生的檔案中,為了保障每個reduce處理的資料均衡 mr引入了分區的概念(partition)

Reduce的數量是可以在程式中手動指定

預設數量為:  1個 Reduce
	可以通過:    job.setNumReduceTasks(0);  0 就是沒有   數字是幾就是幾個
           

6.2.3、自定義分區

//自定義分區 輸入資料map端結果
public class ProvincePartitioner extends Partitioner<Text,AccessLogWritable> {
    //根據業務規則将不同省份結果劃分到不同分區
    private static  HashMap<String,Integer> provincePartitioners =  new HashMap<>();
    static{
        provincePartitioners.put("136",0);
        provincePartitioners.put("137",1);
        provincePartitioners.put("138",2);
        provincePartitioners.put("139",3);
    }

    // 傳回分區号給那個reduce
    @Override
    public int getPartition(Text key, AccessLogWritable accessLogWritable, int numPartitions) {
        String keyPrefix = key.toString().substring(0, 3);
        Integer partionId = provincePartitioners.get(keyPrefix);
        return partionId ==null?4: partionId;
    }
}
//設定分區
job.setPartitionerClass(ProvincePartitioner.class);
//設定reduce數量
job.setNumReduceTasks(5);
           

6.3合并

Combiner合并:又稱之為map端的reduce,主要是通過對map局部的資料先進行一次reduce,進而來減少map端輸出資料頻繁發送給Reduce處理時所帶來的網絡壓力問題。通過這種提前對map輸出做一次局部reduce,這樣既可以減輕網絡壓力,又能提高效率。在mapreduce程式設計模型中預設是關閉的。

開啟Combiner

//shuffle 無須設定 自動處理
//設定Combiner
job.setCombinerClass(AccessLogCustomerTypeReduce.class);
//設定分區
job.setPartitionerClass(ProvincePartitioner.class);
...............
           

6.4mapreduce的運作原理

#1.計算切片
    	有幾個切片就有幾個map task
    #2.環形緩存區
    	經過map函數的邏輯處理後的資料輸出之後,會通過OutPutCollector收集器将資料收集到環形緩存區儲存。
    	環形緩存區的大小預設為100M,當儲存的資料達到80%時,就将緩存區的資料溢出到磁盤上儲存。
    #3.溢出
    	環形緩存區的資料達到其容量的80%時就會溢出到磁盤上進行儲存,在此過程中,程式會對資料進行分區(預設HashPartition)和排序(預設根據key進行快排)
    	緩存區不斷溢出的資料形成多個小檔案
    #4.合并
    	溢出的多個小檔案各個區合并在一起(0區和0區合并成一個0區),形成大檔案
    	通過歸并排序保證區内的資料有序
    #5.shuffle
    	從過程2到過程7之間,即map任務和reduce任務之間的資料流稱為shuffle(混洗),而過程5最能展現出混洗這一概念。一般情況下,一個reduce任務的輸入資料來自與多個map任務,多個reduce任務的情況下就會出現如過程5所示的,
    	每個reduce任務從map的輸出資料中擷取屬于自己的那個分區的資料。
    #6.合并
    	運作reducetask的節點通過過程5,将來自多個map任務的屬于自己的分區資料下載下傳到本地磁盤工作目錄。這多個分區檔案通過歸并排序合并成大檔案,并根據key值分好組(key值相同的,value值會以疊代器的形式組在一起)。
    #7.reducetask
    	reducetask從本地工作目錄擷取已經分好組并且排好序的資料,将資料進行reduce函數中的邏輯處理。
    #8.輸出
    	每個reducetask輸出一個結果檔案。

           

6.5job作業送出過程

1.向ResourceManager請求運作一個mapreduce程式。
2.ResourceManager傳回hdfs位址,告訴用戶端将作業運作相關的資源檔案上傳到hdfs。
3.用戶端送出mr程式運作所需的檔案(包括作業的jar包,作業的配置檔案,分片資訊等)到hdfs上。
4.作業相關資訊送出完成後,用戶端用過調用ResourcrManager的submitApplication()方法送出作業。
5.ResourceManager将作業傳遞給排程器,排程器的預設排程政策是先進先出。
6.排程器尋找一台空閑的節點,并在該節點隔離出一個容器(container),容器中配置設定了cpu,記憶體等資源,并啟動MRAppmaster程序。
7.MRAppmaster根據需要運作多少個map任務,多少個reduce任務向ResourceManager請求資源。
8.ResourceManager配置設定相應數量的容器,并告知MRAppmaster容器在哪。
9.MRAppmaster啟動maptask。
10.maptask從HDFS擷取分片資料執行map邏輯。
11.map邏輯執行結束後,MRAppmaster啟動reducetask。
12.reducetask從maptask擷取屬于自己的分區資料執行reduce邏輯。
13.reduce邏輯結束後将結果資料儲存到HDFS上。
14.mapreduce作業結束後,MRAppmaster通知ResourceManager結束自己,讓ResourceManager回收所有資源。