hadoop
一.Hadoop第一天
1.1 大資料引言
1、什麼是大資料?
對大量資料集檔案處理的過程 b—k---m—G---T—PB—EB—ZB 資料 換算機關 1024
2、大資料的特點?
(1)Volume大量 資料量非常大 TB級别以上的
(2)Variety多樣 資料格式多樣 結構化資料:資料庫中的資料 半結構化資料:json mongdb 非結構化資料:圖檔,視訊 ,音頻。
(3)、Velocity快速 資料的處理一定要快
(4)、Value價值 在大量資料中分析出自己有價值的資料
1.2 hadoop核心設計:
(1)hdfs 大資料檔案存儲問題
HDFS(Hadoop Distributed File System)Hadoop 分布式檔案系統:
基于流資料模式通路
就是可以位元組序列化的資料,java.io.Serializable接口
分布式檔案系統處理的資料必須是流資料,可以寫IO操作的資料
它是以128MB的資料塊 存儲檔案(在Hadoop 1.x版本中是以64MB的資料塊存儲檔案的)
其中每一個存儲節點上都有一個DataNode程序,由NameNode來進行協調。
(2)mapreduce 程式設計模型 計算架構
1.3 hdfs架構的簡單原理:
NameNode: 是整個HDFS叢集的總入口,存儲着HDFS的叢集的檔案中繼資料(如:client上傳檔案的檔案名 副本數 塊數等相關資訊)。
- DataNode: 是真正用來負責存儲資料的節點,一個DataNode就是一個真實的實體主機。
- Block: 資料塊,為了能通過多個節點儲存大資料集,HDFS将大資料集檔案切分成一塊塊的資料塊,在現有hadoop2版本中預設一個塊大小為128M。
metadata:元檔案 (ip 檔案名 block(預設128M) 副本 配置設定到那個datanode)
1.4 hadoop安裝:
1、安裝centos7、x虛拟機,并且啟動
2、輸入hostname 檢視目前主機名字
3、使用vim /etc/hostname 修改主機名字
4、添加主機名字并且和ip映射 vim /etc/hosts
格式 ip(目前的ip位址) centos(主機名字)
5、重新centos系統 reboot 或者關閉虛拟機 重新連接配接
6、必須安裝jdk 并且配置環境變量
7、hadoop下面的目錄及其内部檔案結構
bin ----------可執行的二進制腳本檔案 etc/hadoop目錄 hadoop系統配置檔案所在的目錄
hadoop-env.sh -----------配置環境
core-site.xml ----------配售hdfs叢集的核心配置
hdfs-site.xml ----------用來對hdfs檔案系統做配置的
share -----------用來存放hadoop的依賴jar第三方jar目錄
lib ----------用來存放hadoop使用核心庫檔案
8、配置hadoop的環境變量 /etc/profile
在結尾加上export HADOOP_HOME=/usr/hadoop-2.9.2
export PATH=PATHJAVA_HOME/bin:HADOOP_HOME/bin:HADOOP_HOME/sbin
9、配置core-site.xml
vim /usr/hadoop-2.9.2/etc/hadoop/core-site.xml 加入如下配置:
(就是配置namenode的全局入口====給哪個機器配置namenode 相當于配置位址)
<configuration>
<!--配置hdfs檔案系統預設名稱-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop(主機名):9000</value>
</property>
</configuration>
10、配置hdfs-site.xml
vim /usr/hadoop-2.9.2/etc/hadoop/hdfs-site.xml 加入如下配置: (配置檔案的副本個數
預設是三個)
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
11、配置slaves檔案(将namenode和dataname聯系起來 啟動了namenode namenode就啟動了datanode)
vim /usr/hadoop-2.9.2/etc/hadoop/slaves 加入如下配置: hadoop (目前主機名)
12、格式化hdfs
hdfs namenode -format (僅僅是第一次使用需要格式化)
注意:這裡的格式化是格式成hadoop可以識别的檔案系統,比如我們買了一塊硬碟我們需要格式化成windows或者mac,linux系統識别的檔案系統,才能使用這個檔案系統。
關閉防火牆
systemctl stop firewalld systemctl disable firewalld
二.hadoop第二天
2.1 hadoop中提供的兩種腳本
1、hadoop對hdfs叢集的管理提供了兩種腳本
a、hadoop-daemon.sh start namenode/dataname/secondarynamenode 本地啟動腳本
作用:對叢集中的單個節點進行操作
b、start-dfs.sh 叢集啟動腳本 作用:對進群中所有節點統一操作 指令:start-dfs.sh
可以在任意節點使用,如果在從節點使用,會找到從節點中core-size配置檔案中namenode的入口(就是namenode的位址,他在哪個伺服器上),然後嘗試登入到此namenode伺服器上,再通過namenode中的slaves配置檔案找到所有的datanode伺服器機器名字
通過事先配好的映射找到對應的伺服器ip登入他們,進而打開所有的datanode伺服器
本地啟動腳本,在叢集環境中需要去啟動每一台伺服器,工作量較大。
叢集啟動腳本,整個叢集隻需要啟動一次。啟動過程:随即在一台伺服器上輸入start-dfs.sh指令,然後會通過core-size.xml配置檔案中配置namendoe的位址,去找NameNode的所在的伺服器,會登入此伺服器啟動namenode,然後根據此台伺服器中的slaves配置檔案(namenode與datanode的關聯檔案)中的伺服器名稱,依次映射對應的ip位址找到對應的datanode伺服器,登入啟動所有datanode伺服器。
2.2 修改hdfs預設存儲位置data
通過檢視日志得知namenode資料datanode資料預設都是存放在/tmp/tmp/hadoop-root/dfs下(這裡是臨時的檔案存儲目錄),這對于我們來說是不安全的,系統可能會定期的清除目錄中的檔案,是以為了保證資料的安全完整性,我們需要修改此檔案的預設存儲位置。
2.2.1、修改hdfs預設資料的位置需要修改core-site.xml檔案配置 加入去下配置即可
<property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoop-2.9.2/data</value>
</property>
2.3 SSH工作原理和免密登入
解決啟動hsfs叢集時的登入驗證密碼問題,需要使用ssh協定的第二種方式
2.3.1、ssh協定:
第一種方式:基于密碼的安全驗證 第二種方式:基于密匙的安全驗證
2.3.2、ssh秘鑰原理說明:
存在兩個伺服器A,B。A通路登入B,A需要生成一組秘鑰(公鑰和私鑰),A将公鑰配置在B的信任檔案中,在A要登入B的時候,A會帶着公鑰去找B,B拿到公鑰後先判斷信任檔案中是否含有此公鑰,如果沒有,會拒絕A的登入,如果有會根據此公鑰和随機産生的字元串生成新的資詢再傳給A,A根據私鑰将質詢破解,得到字元串再傳回給B,如果B此次得到的字元串和随機生成的字元串完全相同,則A成功登入B。
2.3.3、配置SSH免密登入
- 生成ssh秘鑰對 ssh-keygen -t rsa 然後回車幾次就可以啦
檢視秘鑰對生成位置 ls /root/.ssh 會發現在home目錄中生成了兩個檔案
id_rsa(私鑰) id_rsa.pub(公鑰)
- 将公鑰加入另一台機器的受信清單中 ssh-copy-id hadoop(主機名) cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys (和上面指令一樣)
- 再次檢視/root/.ssh 目錄 多出兩個檔案其中authorized_keys就是存放公鑰清單檔案 authorized_keys id_rsa id_rsa.pub known_hosts
- 檢測是否配置成功 ssh hadoop 不需要輸入密碼即可
2.3.4、Shell基本指令總結
檢視目錄:hdfs dfs -ls / 上傳檔案:hdfs dfs -put aa.txt / 建立檔案(疊代建立):hdfs
dfs -mkdir -p /bbb/cccc 檢視檔案内容:hdfs dfs -cat /aa.txt 追加檔案内容:hdfs
dfs -appendToFile bb.txt /aa.txt hdfs中複制檔案: hdfs dfs -cp /aa.txt
/datas 下載下傳檔案到本地:hdfs dfs -get /aa.txt /root/down.txt
查找檔案名的目錄:hdfs dfs -find / -name “aa.txt” 将hdfs檔案移動到hdfs另一個位置:hdfs
dfs -mv /bb.txt /datas/bb.txt
2.4 java操作HDFS
2.4.1. 引入依賴
<properties>
<hadoop.version>2.9.2</hadoop.version>
</properties>
<dependencies>
<!--hadoop公共依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--hadoop client 依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
2.4.2 擷取hdfs用戶端
public class TestHDFS {
private FileSystem fileSystem; //hdfs用戶端對象
@Before
public void before() throws IOException {
//hadoop檔案系統的權限設定為root 包裝僞裝使用者
System.setProperty("HADOOP_USER_NAME","root");
//或者在hdfs-size.xml中配置 修改windows對hdfs的權限 true是預設不支援權限
3
//用來對core-site.xml hdfs-site.xml進行配置
Configuration conf = new Configuration();
//連接配接hdfs
conf.set("fs.defaultFS","hdfs://10.15.0.4:9000");
//設定上傳檔案的副本集
conf.set("dfs.replication","1");
fileSystem = FileSystem.get(conf);
}
@After
public void close() throws IOException {
fileSystem.close();
}
}
2.4.3 上傳檔案到hdfs
@Test
public void testUpload() throws IOException {
//流入流 讀取流
FileInputStream is=new FileInputStream("計算機中要上傳檔案的絕對路徑")
Path path = new Path("上傳到hdfs上面的哪個檔案");
FSDataOutputSytream os = fileSystem.create(path);
//參數1:讀取流、參數2 列印流、參數3:位元組數、參數4:是否關流
IOUtils.copyBytes(is,"os",1024,true);
}
2.4.4 hdfs下載下傳檔案`
// 1.第一種方式
@Test
public void testDownload() throws IOException {
Path source = new Path("hdfs上面要下載下傳檔案的路徑");
Path path = new Path("要下載下傳到本地的位置");
//參數一:是否删除源檔案 參數二:hdfs上下載下傳的檔案 參數三:下載下傳到本地檔案的位置 參數四:windows相容
fileSystem.copyTolocalFile(flase,source,path,true);
}
// 2.第二種方式
@Test
public void testDownload1() throws IOException {
Path path = new Path("在hdfs上面要下載下傳的檔案");
FSDataInputStream in = fileSystem.open(path);
FileOutputStream fileOutputStream = new FileOutputStream("要下載下傳到本機的絕對路徑");
//參數一:讀取流 參數二:列印流 參數三:位元組數 參數四:是否關流
IOUtils.copyBytes(in,fileOutputStream,1024,true);
}
2.4.5 展示hdfs目錄和檔案
@Test
public void testListDirs() throws IOException {
Path path = new Path("/");
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
System.out.println(fileStatus.isDirectory()+" "+fileStatus.getPath());
}
}
三.hadoop第三天
3.1hadoop體系下的配置檔案
hadoop配置檔案解析順序 從高到低
- javacode中Confriguration 設定會覆寫core-default.xml hdfs-default.xml中内容
- javaclient jar 中的預設 core-default.xml hdfs-default.xml中内容 (預設dataname有三個)
- hadoop安裝包自定義配置 hadoop etc hadoop core-size.xml hdfs-size.xml内容會覆寫hadoop中預設内容
- hadoop中預設的core-default.xml hdfs-default.xml中内容(預設dataname有三個)
3.2 NameNode 和 SecondaryNameNode
3.2.1、namenode的中繼資料資訊到底存放在哪裡?
由于client對hdfs操作過于頻繁,操作的檔案都是随機操作,是以為了提供hdfs的叢集效率,hdfs叢集将 namenode中資料(中繼資料)始終放在記憶體中。
特點:不安全,記憶體斷電會立即丢失資料。
3.2.2、如何對NameNode資料進行持久化?
FsImage:儲存NameNode目前這一時刻資料狀态 相當于快照
Editslog:日志檔案,隻記錄client到hdfs寫操作(二級制檔案)
用來記錄使用者的寫操作。每當中繼資料有更新或者添加中繼資料時,修改記憶體中的中繼資料并追加到Edits中。
3.2.3、如何解決Edits日志檔案越來越大的問題?
secondaryNameNode:用來定期的完成對FsImage和Edits日志的操作。
規定是一個小時,或者用戶端進行1000000次操作時進行合并。
面試題:secondaryNameNode有什麼作用,沒有可不可以?
答:可以,但是會使edist日志檔案越來越大,效率大大降低。
NameNode在關閉的時候會預設進行edits和fsimage的合并,每次啟動NameNdoe,他都會滾動一次
産生新的edits1… 新的用戶端記錄檔會存儲在新産生的edits1中
3.3 HDFS的HA叢集原理分析:
3.3.1、簡單HDFS叢集中存在的問題:
單節點問題,單節點自動故障轉移
3.3.2、如何解決NameNode單節點問題?
a、找另一個namenode備份原有的namenode資料
b、如何解決叢集中的腦裂問題(一個叢集中多個管理者資料不一緻這種情況稱之為------腦裂)
3.3.3、如何解決啟動多個NameNode時保證同一時刻隻有一個namenode工作,避免腦裂問題?
QJM使用Zookeeper 完成高可用
面試題:在HA叢集中如果ZK與NameNode(advice)出現網絡上延遲這種情況,ZK會自動切換NameNode(standby)為活躍節點,這個時候叢集就出現了多個活躍的NameNode(advice) 也就意味着出現了腦裂的問題!怎麼解決?
提示:考察的是JournalNode中的兩種功能: 1、同步資料 2、隔離機制
答案:JournalNode有一下兩種功能:
1、負責NameNode中edits的同步資料。
2、JournalNode的隔離機制,因為它是唯一個連接配接着兩個NameNode的元件,是以他會第一個發現。為了保證這一時刻隻有一個NameNode(advice)活躍,他會使用ssh登入到NameNode節點上,使用kill指令殺死NameNode
四.hadoop第四天
4.1高可用HDFS具體步驟:
4.1.1、首先搭建ZK叢集,安裝zk tar -zxvf zookeeper-3.4.12.tar.gz,同步zk2 zk3 scp -r zookeeper-3.4.12 [email protected]:/root/ ,在每一個節點上建立zk資料目錄mkdir /root/zkdata,在每一個節點存在zk資料的目錄中必須建立一個myid檔案,數位元組點唯一 zk1:echo “1” >>zkdata/myid,zk2:echo “2” >>zkdata/myid,zk3:echo “3” >>zkdata/myid,在每一個zk節點資料目錄中建立一個名字為zoo.cfg zk配置檔案 zk1-2-3:touch zkdata/zoo.cfg
修改不同的節點配置zk1-2-3
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata1
clientPort=300
server.1=主機名:3002:3003
server.2=主機名:4002:4003
server.3=主機名:5002:5003
啟動zk節點:如果沒有配置zk的環境變量需要進入bin才能啟動,./zkServer.sh start /root/zkdata/zoo_sampe.cfg ,檢視狀态:./zkServer.sh status /root/zkdata/zoo_sampe.cfg
4.1.2、然後搭建HDFS叢集:
設定主機名和ip映射,配置環境變量,配置ssh免密碼登入,在namenode上面生成公私秘鑰,将公鑰傳給datanode的伺服器,ssh-keygen -t rsa 生成公私秘鑰,ssh-copy-id namenode主機名,在各個伺服器上面必須安裝依賴 yum install psmisc -y 在每一台伺服器上面安裝hadoop并且壓縮,配置hadoop的環境變量(否則指令隻能在bin中執行)source /etc/profile 測試 echo $PATH
修改hadoop中的配置檔案:hadoop-env.sh
- core-size.xml中的配置檔案
- <!--hdfs主要入口不再是一個具體機器而是一個虛拟的名稱ns 後面配置檔案會映射所對應所有namenode -->
- <property>
<name>fs.defaultFS</name>
<value>hdfs://ns</value>
</property>
- <property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoop-2.9.2/data</value>
</property>
- <!--這裡的hadoop指的是zk叢集對應的每一個伺服器名稱和對應的端口-->
- <property>
<name>ha.zookeeper.quorum</name>
<value>hadoop1:3001,hadoop1:4001,hadoop1:5001</value>
</property>
- hdfs-site.xml中的配置檔案
- <!--指定hdfs的nameservice為ns,這裡和core-size.xml中保持一緻-->
<property>
<name>dfs.nameservices</name>
<value>ns</value>
</property>
- <!-- ns下面有兩個NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns</name>
<value>nn1,nn2</value>
</property>
- <!-- nn1的RPC通信位址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn1</name>
<value>hadoop2:9000</value>
</property>
- <!-- nn1的http通信位址 -->
<property>
<name>dfs.namenode.http-address.ns.nn1</name>
<value>hadoop2:50070</value>
</property>
- <!-- nn2的RPC通信位址 -->
<property>
<name>dfs.namenode.rpc-address.ns.nn2</name>
<value>hadoop3:9000</value>
</property>
<!-- nn2的http通信位址 -->
<property>
<name>dfs.namenode.http-address.ns.nn2</name>
<value>hadoop3:50070</value>
</property>
- <!-- 指定NameNode的中繼資料在JournalNode上的存放位置 這裡的hadoop是journalNode所在伺服器-->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop2:8485;hadoop3:8485;hadoop4:8485/ns</value>
</property>
- <!-- 指定JournalNode在本地磁盤存放資料的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/root/journal</value>
</property>
- <!-- 開啟NameNode故障時自動切換 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
- <!-- 配置失敗自動切換實作方式 -->
<property>
<name>dfs.client.failover.proxy.provider.ns</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
- <!-- 配置隔離機制,如果ssh是預設22端口,value直接寫sshfence即可 -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
<value>shell(true)</value>
</property>
- <!-- 使用隔離機制時需要ssh免登陸 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
-
修改slaves檔案指定哪些機器為DataNode
hadoop2
hadoop3
hadoop4
- 在任意一個namenode上執行格式化zk指令:hdfs zkfc -formatZK
-
啟動journalnode(分别在每一個存在journalnode上執行hadoop-daemon.sh start journalnode)
使用:jps檢視如過都出現journalnode程序說明啟動成功`
在(NameNode active)節點執行格式化指令:hdfs namenode -format ns
啟動hdfs叢集:start-dfs.sh
在standby 的 NameNode節點上執行格式化指令: hdfs namenode -bootstrapStandby
啟動standby 的 NameNode節點:hadoop-daemon.sh start namenode
4.2MapReduce
4.2.1核心:計算 hadoop體系下面的一個程式設計模型,計算架構 主要是用來操作HDFS中存儲資料并對HDFS中資料進行計算。
4.2.2:Map:局部計算 Reduce:彙總計算 (兩者都是需要我們寫的代碼)
4.3 Yarn 理論概念
1、概念:統一資源排程器,任務監控管理器
2、作用:整合hadoop叢集中的資源(cpu,記憶體)進行統一的排程 哪個伺服器需求大 就給誰。
任務監控 監控map和reduce的執行情況
Yarn = ResourceManage (master) + NodeManager(slave)
注意:DataNode伺服器一定是NodeManager
ResourceManager作用-----------真正的資源管理者,監控者,決定給每個伺服器的資源多少!
NodeManager作用1--------- map和Reduce的執行者 他們是程式員寫的計算代碼 NodeManager在每一個Datanode中執行每一個map(map計算block内容) 就相當于idea的功能 彙總計算等待所有局部計算完成後 在一個大功能伺服器中(存在NodeManage的伺服器)将所有計算彙總 同樣也是NodeManager執行,
作用2-------NodeManager還會向ResourceManager彙報資訊,需要多少記憶體和cpu等資源(其實是ApplicationMaster彙報的)确定此NodeManager還活着。
3.Job作業
一組MapReduce也被稱之為一個job作業,代表一種計算 hadoop的叢集
4.4 Yarn叢集搭建
1、叢集規劃:
10.15.0.15 hadoop15 NameNode DataNode NodeManager
10.15.0.16 hadoop16 DataNode NodeManager ResourceManager
10.15.0.17 hadoop17 DataNode NodeManager
2、
a、修改ip位址 修改主機名稱 重新開機機器
b、配置主機名稱和對應的ip映射。
c、配置ssh免密登入。
NameNode hadoop15: 生成ssh-keygen ssh-copy-id hadoop15 16 17
ResourceManager hadoop16: 生成ssh-keygen ssh-copy-id hadoop15 16 17
d、配置環境變量
3、安裝hadoop 配置檔案
hadoop-enc.sh core-site.xml hdfs-size.xml maped-site.xml
yarn-site.xml slaves:作用 決定namenode中的datanode是誰
決定resourceManager的nodemanager是誰
4、hdfs叢集相關配置省略:
4.1.複制 cp hadoop-2.9.2/etc/hadoop/mapred-site.xml.template hadoop-2.9.2/etc/hadoop/mapred-site.xml
4.2.編輯 vim hadoop-2.9.2/etc/hadoop/mapred-site.xml添加配置
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
4.3 配置yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--`hadoop`為ResourceManager目前機器的主機名-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>Hadoop</value>
</property>
4.4、啟動yarn:隻能在有ResourceManager的伺服器上面啟動: start-yarn.sh
浏覽器通路yarn頁面: http://10.15.0.16:8088/cluster
五.hadoop第五天
5.1 Job作業體系結構
5.1.1:job作業的資料來源一定是Hdfs,最總結果儲存到hdfs中,在整個五個階段中 map階段和reduce階段是程式員代碼手工編碼的。
hdfs----->inputFormat------>map------------------>shuffle---------------->reduce------->outputFormat------->hdfs
hdfs----->資料讀入 ------->局部計算---->局部計算資料排序和分組---->彙總計算---->結果輸出---------------->hdfs
5.1.2:
InputFormat: keyin—>行字母偏移量 Longwritable valuein------->讀取的一行 字元串 Text
keyout--->行字母偏移量 Longwritable valueout------->讀取的一行 字元串 Text
map : keyin—>行字母偏移量 Longwritable valuein------->讀取的一行 字元串 Text
keyout--->每一行得到的資料(或者分割的部分資料)Text valueout---->根據需求設定
shuffle :局部計算進行排序和彙總
reduce :keyin----->根據map傳過來的進行判斷 value------->根據map傳過來的進行判斷
keyout--->根據需求設定 valueout---->根據需求設定
5.2 mapreduce程式代碼
5.2.1:步驟:
準備資料檔案 将資料檔案上傳到hdfs上面
引入依賴 開發job作業(開發map階段 開發reduces階段 開發job階段)
5.2.2:開發Job作業編碼
//word count job作業開發
public class WordCountJob extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new WordCountJob(),args);
}
@Override
public int run(String[] strings) throws Exception {
//建立job作業
Configuration conf = getConf();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountJob.class);
//設定Input Format
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("/wordcount/data"));
//設定map階段
job.setMapperClass(WordCountMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//設定Shuffle 階段 預設
//設定reduce 階段
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定 Output Formate
job.setOutputFormatClass(TextOutputFormat.class);
//注意:要求結果目錄不能存在
FileSystem fileSystem = FileSystem.get(conf);
Path res = new Path("/wordcount/res");
if(fileSystem.exists(res)) {
fileSystem.delete(res,true);
}
TextOutputFormat.setOutputPath(job, res);
//送出job作業
boolean b = job.waitForCompletion(true);
System.out.println("作業執行狀态 = " + b);
return 0;
}
//開發Map階段
public static class WordCountMap extends Mapper<LongWritable, Text,Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//value 就是讀入的一行資料
String[] keys = value.toString().split(" ");
for (String word : keys) {
context.write(new Text(word),new IntWritable(1));
}
}
}
//開發Reduce階段
public static class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum+=value.get();
}
context.write(key,new IntWritable(sum));
}
}
}
注意:在執行mapreduce作業過程中,一定涉及到資料資料的序列化,hadoop對原始基本資料類型進行了二次包裝
hadoop中包裝類型 java原始資料類型
Text String
LongWritable Long
IntWritable Integer
FloatWritable Float
DoubleWritable Double
5.3MapReduce 自動化運作
5.3.1打包時指定main Class資訊
5.3.2使用wagon插件實作自動上傳至hadoop叢集
5.3.3使用wagon上傳jar完成後遠端執行job作業
<build>
<!--擴充maven的插件中加入ssh插件-->
<extensions>
<extension>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-ssh</artifactId>
<version>2.8</version>
</extension>
</extensions>
<plugins>
<!-- 在打包插件中指定main class 資訊 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<outputDirectory>${basedir}/target</outputDirectory>
<archive>
<manifest>
<mainClass>com.baizhi.wordcount.WordCountJob</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>wagon-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<fromFile>target/${project.build.finalName}.jar</fromFile>
<url>scp://root:[email protected]/root</url>
<commands>
<!-- 通過sh 執行shell腳本檔案 -->
<command>nohup hadoop-2.9.2/bin/hadoop jar hadoop_wordcount-1.0-SNAPSHOT.jar > /root/mapreduce.out 2>&1 & </command>
</commands>
<displayCommandOutputs>true</displayCommandOutputs>
</configuration>
</plugin>
</plugins>
</build>
導入插件maven helper
wagon配置加入commands指令執行指令操作:clean package wagon:upload-single wagon:sshexec
5.4 配置曆史伺服器
5.4.1配置mapped-site.xml 并同步叢集配置
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop5:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop5:19888</value>
</property>
5.4.2 配置yarn-site.xml 并同步叢集配置`
<!--開啟日志聚合-->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!--日志儲存時間 機關秒 這裡是7天-->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
5.4.3 啟動曆史伺服器
[root@hadoop6 ~]# mr-jobhistory-daemon.sh start historyserver
[root@hadoop6 ~]# mr-jobhistory-daemon.sh stop historyserver
5.4.5使用日志
private Logger logger= Logger.getLogger(想打日志的類對象);
logger.info(想列印的資訊)
六.hadoop第六天
6.1自定義類型
資料類型都間接實作了:Wirtable(實作序列化反序列化功能,實作他的write寫的序列化,readFields讀的序列化)Comparable(實作排序功能,實作它的compareTo比較方法完成排序功能) 。直接實作WritableComparable接口(此接口繼承Writable,Comparable),是以我們自定義類型也需要實作相應的接口
通過檢視源碼得知自定義的資料類型需要實作類中 wirte、readFiles、compareTo、hashCode 和equals、toString等相關方法。如果存在位址不同的自定義對象位址相同,則需要繼承hashCode 和equals方法。
開發自定義Writable類型
//自定義Writable類型
public class AccessLogWritable implements WritableComparable<AccessLogWritable> {
private Integer upload;
private Integer down;
private Integer total;
@Override
public int compareTo(AccessLogWritable o) {
return this.total-o.getTotal();
}
//把對象序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upload);
out.writeInt(down);
out.writeInt(total);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.upload = in.readInt();
this.down = in.readInt();
this.total = in.readInt();
}
@Override
public String toString() {
return "統計結果{" +
"上傳流量=" + upload +
", 下載下傳流量=" + down +
", 上傳下載下傳總流量=" + total +
'}';
}
注意:write的順序和read的順序必須嚴格一緻,讀的類型和寫的類型也必須完全一緻
6.2mapreduce進階特性
6.2.1、mapreduce 計算、去重、排序、清洗工作
資料清洗:在複雜的資料中抽取我們業務關注的資料的過程稱之為資料的清洗。
資料清洗的好處:減小資料的檔案體積、加速資料的計算效率。
資料清洗沒有reduce階段 這個設定是job.setNumReduceTasks(0)
6.2.2、reduce個數設定!
預設job作業中隻有一個reduce,可以設定多個reduce,通過job.setNumReduceTasks(n)設定有n個。
設定多個reduce提高了mapreduce的運作效率,一旦設定了多個reduce,結果會根據reduce數量放入不同的産生的檔案中,為了保障每個reduce處理的資料均衡 mr引入了分區的概念(partition)
Reduce的數量是可以在程式中手動指定
預設數量為: 1個 Reduce
可以通過: job.setNumReduceTasks(0); 0 就是沒有 數字是幾就是幾個
6.2.3、自定義分區
//自定義分區 輸入資料map端結果
public class ProvincePartitioner extends Partitioner<Text,AccessLogWritable> {
//根據業務規則将不同省份結果劃分到不同分區
private static HashMap<String,Integer> provincePartitioners = new HashMap<>();
static{
provincePartitioners.put("136",0);
provincePartitioners.put("137",1);
provincePartitioners.put("138",2);
provincePartitioners.put("139",3);
}
// 傳回分區号給那個reduce
@Override
public int getPartition(Text key, AccessLogWritable accessLogWritable, int numPartitions) {
String keyPrefix = key.toString().substring(0, 3);
Integer partionId = provincePartitioners.get(keyPrefix);
return partionId ==null?4: partionId;
}
}
//設定分區
job.setPartitionerClass(ProvincePartitioner.class);
//設定reduce數量
job.setNumReduceTasks(5);
6.3合并
Combiner合并:又稱之為map端的reduce,主要是通過對map局部的資料先進行一次reduce,進而來減少map端輸出資料頻繁發送給Reduce處理時所帶來的網絡壓力問題。通過這種提前對map輸出做一次局部reduce,這樣既可以減輕網絡壓力,又能提高效率。在mapreduce程式設計模型中預設是關閉的。
開啟Combiner
//shuffle 無須設定 自動處理
//設定Combiner
job.setCombinerClass(AccessLogCustomerTypeReduce.class);
//設定分區
job.setPartitionerClass(ProvincePartitioner.class);
...............
6.4mapreduce的運作原理
#1.計算切片
有幾個切片就有幾個map task
#2.環形緩存區
經過map函數的邏輯處理後的資料輸出之後,會通過OutPutCollector收集器将資料收集到環形緩存區儲存。
環形緩存區的大小預設為100M,當儲存的資料達到80%時,就将緩存區的資料溢出到磁盤上儲存。
#3.溢出
環形緩存區的資料達到其容量的80%時就會溢出到磁盤上進行儲存,在此過程中,程式會對資料進行分區(預設HashPartition)和排序(預設根據key進行快排)
緩存區不斷溢出的資料形成多個小檔案
#4.合并
溢出的多個小檔案各個區合并在一起(0區和0區合并成一個0區),形成大檔案
通過歸并排序保證區内的資料有序
#5.shuffle
從過程2到過程7之間,即map任務和reduce任務之間的資料流稱為shuffle(混洗),而過程5最能展現出混洗這一概念。一般情況下,一個reduce任務的輸入資料來自與多個map任務,多個reduce任務的情況下就會出現如過程5所示的,
每個reduce任務從map的輸出資料中擷取屬于自己的那個分區的資料。
#6.合并
運作reducetask的節點通過過程5,将來自多個map任務的屬于自己的分區資料下載下傳到本地磁盤工作目錄。這多個分區檔案通過歸并排序合并成大檔案,并根據key值分好組(key值相同的,value值會以疊代器的形式組在一起)。
#7.reducetask
reducetask從本地工作目錄擷取已經分好組并且排好序的資料,将資料進行reduce函數中的邏輯處理。
#8.輸出
每個reducetask輸出一個結果檔案。
6.5job作業送出過程
1.向ResourceManager請求運作一個mapreduce程式。
2.ResourceManager傳回hdfs位址,告訴用戶端将作業運作相關的資源檔案上傳到hdfs。
3.用戶端送出mr程式運作所需的檔案(包括作業的jar包,作業的配置檔案,分片資訊等)到hdfs上。
4.作業相關資訊送出完成後,用戶端用過調用ResourcrManager的submitApplication()方法送出作業。
5.ResourceManager将作業傳遞給排程器,排程器的預設排程政策是先進先出。
6.排程器尋找一台空閑的節點,并在該節點隔離出一個容器(container),容器中配置設定了cpu,記憶體等資源,并啟動MRAppmaster程序。
7.MRAppmaster根據需要運作多少個map任務,多少個reduce任務向ResourceManager請求資源。
8.ResourceManager配置設定相應數量的容器,并告知MRAppmaster容器在哪。
9.MRAppmaster啟動maptask。
10.maptask從HDFS擷取分片資料執行map邏輯。
11.map邏輯執行結束後,MRAppmaster啟動reducetask。
12.reducetask從maptask擷取屬于自己的分區資料執行reduce邏輯。
13.reduce邏輯結束後将結果資料儲存到HDFS上。
14.mapreduce作業結束後,MRAppmaster通知ResourceManager結束自己,讓ResourceManager回收所有資源。